Fabric REST APIs: Programmatic Management, Automating Workspace Setup, Triggering Pipelines, and Building Admin Scripts in Python

Fabric REST APIs: Programmatic Management, Automating Workspace Setup, Triggering Pipelines, and Building Admin Scripts in Python

You have been managing Fabric through the portal — clicking buttons to create workspaces, running pipelines manually, checking monitoring hub for failures. That works for 5 items. It does not work for 50.

What if you need to create 10 workspaces for 10 project teams? What if you need to trigger a pipeline from an external system (Jenkins, GitHub Actions, a custom app)? What if you need to export a list of all items across all workspaces for an audit? What if you need to programmatically assign workspace roles when a new team member joins?

That is where the Fabric REST APIs come in — programmatic control over everything you can do in the portal, and some things you cannot.

Think of the Fabric portal as driving a car manually — steering, braking, accelerating one action at a time. The REST API is autopilot — you program the route (write a script), press start, and the car drives itself. For repetitive tasks (create 10 workspaces, trigger 20 pipelines, audit 100 items), autopilot is not just faster — it is the only practical option.

Table of Contents

  • What Are Fabric REST APIs?
  • Authentication: Getting an Access Token
  • Using Azure AD App Registration
  • Using Azure CLI
  • Using mssparkutils (from Notebooks)
  • Core API Endpoints
  • Workspace Management
  • Item Management (Lakehouse, Notebook, Pipeline)
  • Pipeline Execution
  • Lakehouse Table Management
  • Capacity Management
  • Real-World Script 1: Create Workspace and Items
  • Real-World Script 2: Trigger Pipeline from External System
  • Real-World Script 3: Audit All Items Across Workspaces
  • Real-World Script 4: Bulk Assign Workspace Roles
  • Real-World Script 5: Monitor Pipeline Runs
  • Real-World Script 6: Export Lakehouse Table List
  • Using APIs from Fabric Notebooks
  • Integrating with CI/CD (GitHub Actions / Azure DevOps)
  • Triggering Deployment Pipelines via API
  • Rate Limits and Best Practices
  • The Fabric Python SDK (microsoft-fabric)
  • Common Mistakes
  • Interview Questions
  • Wrapping Up

What Are Fabric REST APIs?

Fabric REST APIs let you manage Fabric resources programmatically using HTTP requests. Every action in the portal — creating a workspace, running a pipeline, listing items — has a corresponding API endpoint.

Portal (manual):
  Click "New workspace" → type name → click Create

API (programmatic):
  POST https://api.fabric.microsoft.com/v1/workspaces
  Body: {"displayName": "DataEng_Prod", "capacityId": "guid"}

  Same result. But scriptable, repeatable, automatable.

Base URL: https://api.fabric.microsoft.com/v1/

Authentication: Getting an Access Token

Every API call requires a Bearer token in the Authorization header.

Using Azure AD App Registration (Production)

# Step 1: Register an app in Azure AD (Entra ID)
# Step 2: Grant it Fabric API permissions
# Step 3: Get a token programmatically

from azure.identity import ClientSecretCredential

tenant_id = "your-tenant-id"
client_id = "your-app-client-id"
client_secret = "your-app-client-secret"

credential = ClientSecretCredential(tenant_id, client_id, client_secret)
token = credential.get_token("https://api.fabric.microsoft.com/.default")

headers = {
    "Authorization": f"Bearer {token.token}",
    "Content-Type": "application/json"
}

Using Azure CLI (Quick Testing)

# Login
az login

# Get token
TOKEN=$(az account get-access-token --resource https://api.fabric.microsoft.com --query accessToken -o tsv)

# Use in curl
curl -H "Authorization: Bearer $TOKEN"      https://api.fabric.microsoft.com/v1/workspaces

Using mssparkutils (from Notebooks)

# Inside a Fabric notebook — easiest method
token = mssparkutils.credentials.getToken("https://api.fabric.microsoft.com")
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

Core API Endpoints

Workspace Management

import requests

BASE_URL = "https://api.fabric.microsoft.com/v1"

# List all workspaces
response = requests.get(f"{BASE_URL}/workspaces", headers=headers)
workspaces = response.json()["value"]
for ws in workspaces:
    print(f"  {ws['displayName']} (ID: {ws['id']})")

# Create a workspace
payload = {
    "displayName": "DataEng_NewProject",
    "capacityId": "your-capacity-guid",
    "description": "Workspace for new data engineering project"
}
response = requests.post(f"{BASE_URL}/workspaces", headers=headers, json=payload)
new_ws = response.json()
print(f"Created: {new_ws['displayName']} (ID: {new_ws['id']})")

# Delete a workspace
requests.delete(f"{BASE_URL}/workspaces/{workspace_id}", headers=headers)

# Add a user to workspace
payload = {
    "identifier": "engineer@company.com",
    "groupUserAccessRight": "Member"   # Admin, Member, Contributor, Viewer
}
requests.post(f"{BASE_URL}/workspaces/{workspace_id}/roleAssignments",
              headers=headers, json=payload)

Item Management (Lakehouse, Notebook, Pipeline)

# List all items in a workspace
response = requests.get(f"{BASE_URL}/workspaces/{workspace_id}/items", headers=headers)
items = response.json()["value"]
for item in items:
    print(f"  {item['type']:20s} | {item['displayName']}")

# Output:
#   Lakehouse            | bronze_lakehouse
#   Lakehouse            | silver_lakehouse
#   Warehouse            | gold_warehouse
#   Notebook             | NB_Clean_Customers
#   DataPipeline         | PL_Daily_ETL
#   SemanticModel        | Sales Analytics

# Create a Lakehouse
payload = {"displayName": "test_lakehouse", "type": "Lakehouse"}
response = requests.post(f"{BASE_URL}/workspaces/{workspace_id}/items",
                         headers=headers, json=payload)

# Delete an item
requests.delete(f"{BASE_URL}/workspaces/{workspace_id}/items/{item_id}", headers=headers)

Pipeline Execution

# Trigger a pipeline run
pipeline_id = "your-pipeline-guid"
response = requests.post(
    f"{BASE_URL}/workspaces/{workspace_id}/items/{pipeline_id}/jobs/instances?jobType=Pipeline",
    headers=headers
)
run_id = response.headers.get("Location", "").split("/")[-1]
print(f"Pipeline started. Run ID: {run_id}")

# Check pipeline run status
response = requests.get(
    f"{BASE_URL}/workspaces/{workspace_id}/items/{pipeline_id}/jobs/instances/{run_id}",
    headers=headers
)
status = response.json()
print(f"Status: {status['status']}")  # InProgress, Completed, Failed

Lakehouse Table Management

# List tables in a Lakehouse
response = requests.get(
    f"{BASE_URL}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/tables",
    headers=headers
)
tables = response.json()["data"]
for table in tables:
    print(f"  {table['name']:30s} | Format: {table['format']} | Location: {table['location']}")

# Load a file into a Lakehouse table
payload = {
    "relativePath": "Files/raw_csv/customers.csv",
    "pathType": "File",
    "mode": "Overwrite",
    "recursive": False,
    "formatOptions": {
        "format": "Csv",
        "header": True,
        "delimiter": ","
    }
}
requests.post(
    f"{BASE_URL}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/tables/customers/load",
    headers=headers, json=payload
)

Capacity Management

# List capacities
response = requests.get(f"{BASE_URL}/capacities", headers=headers)
for cap in response.json()["value"]:
    print(f"  {cap['displayName']} | SKU: {cap['sku']} | State: {cap['state']}")

Real-World Script 1: Create Workspace and Items

import requests
import time

def create_project_workspace(project_name, capacity_id, team_members, headers):
    '''Create a complete project workspace with standard items.'''

    BASE_URL = "https://api.fabric.microsoft.com/v1"

    # Step 1: Create workspace
    ws_payload = {
        "displayName": f"DE_{project_name}",
        "capacityId": capacity_id,
        "description": f"Data engineering workspace for {project_name}"
    }
    ws_response = requests.post(f"{BASE_URL}/workspaces", headers=headers, json=ws_payload)
    workspace_id = ws_response.json()["id"]
    print(f"Created workspace: DE_{project_name} ({workspace_id})")

    # Step 2: Create standard Lakehouses
    for lh_name in ["bronze_lakehouse", "silver_lakehouse"]:
        requests.post(f"{BASE_URL}/workspaces/{workspace_id}/items",
                      headers=headers, json={"displayName": lh_name, "type": "Lakehouse"})
        print(f"  Created: {lh_name}")

    # Step 3: Create Warehouse
    requests.post(f"{BASE_URL}/workspaces/{workspace_id}/items",
                  headers=headers, json={"displayName": "gold_warehouse", "type": "Warehouse"})
    print(f"  Created: gold_warehouse")

    # Step 4: Add team members
    for email, role in team_members.items():
        payload = {"identifier": email, "groupUserAccessRight": role}
        requests.post(f"{BASE_URL}/workspaces/{workspace_id}/roleAssignments",
                      headers=headers, json=payload)
        print(f"  Added: {email} as {role}")

    return workspace_id

# Usage
team = {
    "naveen@company.com": "Admin",
    "shrey@company.com": "Member",
    "vrushab@company.com": "Member",
    "analyst@company.com": "Viewer"
}
ws_id = create_project_workspace("SalesAnalytics", "capacity-guid", team, headers)

Real-World Script 2: Trigger Pipeline from External System

import requests
import time

def trigger_and_monitor_pipeline(workspace_id, pipeline_id, headers, timeout_minutes=60):
    '''Trigger a Fabric pipeline and wait for completion.'''

    BASE_URL = "https://api.fabric.microsoft.com/v1"

    # Trigger
    response = requests.post(
        f"{BASE_URL}/workspaces/{workspace_id}/items/{pipeline_id}/jobs/instances?jobType=Pipeline",
        headers=headers
    )

    if response.status_code not in [200, 202]:
        raise Exception(f"Failed to trigger: {response.status_code} {response.text}")

    location = response.headers.get("Location", "")
    print(f"Pipeline triggered. Monitoring...")

    # Poll for completion
    start_time = time.time()
    while True:
        status_response = requests.get(location, headers=headers)
        status = status_response.json().get("status", "Unknown")

        if status == "Completed":
            print(f"Pipeline completed successfully!")
            return True
        elif status == "Failed":
            print(f"Pipeline FAILED!")
            return False
        elif time.time() - start_time > timeout_minutes * 60:
            print(f"Pipeline timed out after {timeout_minutes} minutes")
            return False

        print(f"  Status: {status}... waiting 30 seconds")
        time.sleep(30)

# Call from Jenkins, GitHub Actions, or any external system
success = trigger_and_monitor_pipeline(workspace_id, pipeline_id, headers)

Real-World Script 3: Audit All Items Across Workspaces

import requests
import csv

def audit_all_fabric_items(headers, output_file="fabric_audit.csv"):
    '''Export a complete inventory of all Fabric items across all workspaces.'''

    BASE_URL = "https://api.fabric.microsoft.com/v1"

    # Get all workspaces
    ws_response = requests.get(f"{BASE_URL}/workspaces", headers=headers)
    workspaces = ws_response.json()["value"]

    all_items = []
    for ws in workspaces:
        items_response = requests.get(
            f"{BASE_URL}/workspaces/{ws['id']}/items", headers=headers
        )
        if items_response.status_code == 200:
            for item in items_response.json().get("value", []):
                all_items.append({
                    "workspace": ws["displayName"],
                    "workspace_id": ws["id"],
                    "item_name": item["displayName"],
                    "item_type": item["type"],
                    "item_id": item["id"],
                    "description": item.get("description", "")
                })

    # Write to CSV
    with open(output_file, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=all_items[0].keys())
        writer.writeheader()
        writer.writerows(all_items)

    print(f"Audit complete: {len(all_items)} items across {len(workspaces)} workspaces")
    print(f"Exported to: {output_file}")

    # Summary
    from collections import Counter
    type_counts = Counter(item["item_type"] for item in all_items)
    print(f"
Item counts by type:")
    for item_type, count in type_counts.most_common():
        print(f"  {item_type:25s}: {count}")

audit_all_fabric_items(headers)

Real-World Script 4: Bulk Assign Workspace Roles

def bulk_assign_roles(workspace_id, assignments, headers):
    '''Assign roles to multiple users at once.'''

    BASE_URL = "https://api.fabric.microsoft.com/v1"

    for email, role in assignments.items():
        payload = {"identifier": email, "groupUserAccessRight": role}
        response = requests.post(
            f"{BASE_URL}/workspaces/{workspace_id}/roleAssignments",
            headers=headers, json=payload
        )
        status = "✅" if response.status_code in [200, 201] else "❌"
        print(f"  {status} {email}: {role}")

# Onboard a new team
new_team = {
    "alice@company.com": "Member",
    "bob@company.com": "Member",
    "carol@company.com": "Contributor",
    "david@company.com": "Viewer",
    "eve@company.com": "Viewer",
}
bulk_assign_roles("workspace-guid", new_team, headers)

Real-World Script 5: Monitor Pipeline Runs

def get_recent_pipeline_failures(workspace_id, headers, hours=24):
    '''Find failed pipeline runs in the last N hours.'''

    BASE_URL = "https://api.fabric.microsoft.com/v1"

    # List all pipelines
    items = requests.get(f"{BASE_URL}/workspaces/{workspace_id}/items",
                         headers=headers).json()["value"]
    pipelines = [i for i in items if i["type"] == "DataPipeline"]

    print(f"Checking {len(pipelines)} pipelines for failures in last {hours} hours...")

    for pipeline in pipelines:
        runs = requests.get(
            f"{BASE_URL}/workspaces/{workspace_id}/items/{pipeline['id']}/jobs/instances",
            headers=headers
        )
        if runs.status_code == 200:
            for run in runs.json().get("value", []):
                if run.get("status") == "Failed":
                    print(f"  ❌ {pipeline['displayName']}")
                    print(f"     Run ID: {run['id']}")
                    print(f"     Start: {run.get('startTimeUtc', 'N/A')}")
                    print(f"     Error: {run.get('failureReason', 'N/A')}")

get_recent_pipeline_failures("workspace-guid", headers)

Using APIs from Fabric Notebooks

# Inside a Fabric notebook — simplest authentication
import requests
import json

token = mssparkutils.credentials.getToken("https://api.fabric.microsoft.com")
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

BASE_URL = "https://api.fabric.microsoft.com/v1"

# Get current workspace info
workspace_id = mssparkutils.env.getWorkspaceId()

# List all tables in the current lakehouse
lakehouse_id = mssparkutils.env.getLakehouseId()
response = requests.get(
    f"{BASE_URL}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/tables",
    headers=headers
)
tables = response.json()["data"]
print(f"Tables in current lakehouse:")
for t in tables:
    print(f"  {t['name']}")

Integrating with CI/CD (GitHub Actions)

# .github/workflows/deploy-fabric.yml
name: Deploy to Fabric

on:
  push:
    branches: [main]

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Azure Login
        uses: azure/login@v1
        with:
          creds: ${{ secrets.AZURE_CREDENTIALS }}

      - name: Get Fabric Token
        run: |
          TOKEN=$(az account get-access-token --resource https://api.fabric.microsoft.com --query accessToken -o tsv)
          echo "FABRIC_TOKEN=$TOKEN" >> $GITHUB_ENV

      - name: Trigger Deployment Pipeline
        run: |
          curl -X POST             "https://api.fabric.microsoft.com/v1/deploymentPipelines/${{ vars.PIPELINE_ID }}/deploy"             -H "Authorization: Bearer $FABRIC_TOKEN"             -H "Content-Type: application/json"             -d '{"sourceStageOrder": 0, "targetStageOrder": 1}'

Rate Limits and Best Practices

Practice Why
Add delays between calls time.sleep(1) between API calls avoids rate limiting
Handle 429 responses Retry after the Retry-After header value
Use pagination Large result sets return paginated — follow continuationUri
Cache tokens Tokens last ~60 minutes — do not request a new one per call
Use service principals For production automation, not personal accounts
Log all API calls For audit trail and debugging

Common Mistakes

  1. Using personal tokens in production — personal tokens expire when the person’s session ends. Use Azure AD app registrations (service principals) for automated scripts.

  2. Not handling pagination — listing items in large workspaces returns paginated results. Always check for continuationUri in the response and follow it.

  3. Ignoring rate limits — rapid-fire API calls get 429 (Too Many Requests). Add time.sleep(1) between calls and implement retry logic with exponential backoff.

  4. Hardcoding IDs — workspace IDs and item IDs change across environments. Use the List endpoints to discover IDs by name, then use the IDs in subsequent calls.

  5. Not checking response status codes — always verify response.status_code before using response.json(). A 403 means permission denied, 404 means item not found.

Interview Questions

Q: What can you do with Fabric REST APIs? A: Programmatically manage all Fabric resources: create/delete workspaces, list/create items (lakehouses, notebooks, pipelines), trigger pipeline runs and monitor their status, assign workspace roles, manage capacity, and trigger deployment pipelines. APIs enable automation that is not practical through the portal — bulk operations, CI/CD integration, external system triggers, and audit scripts.

Q: How do you authenticate with the Fabric REST API? A: Three methods: Azure AD app registration (service principal) for production automation, Azure CLI for quick testing, or mssparkutils.credentials.getToken() from within Fabric notebooks. All methods produce a Bearer token passed in the Authorization header. Service principals are recommended for production.

Q: How do you trigger a Fabric pipeline from an external system? A: POST to the pipeline jobs endpoint with a Bearer token. The API returns a Location header with the run URL. Poll that URL to monitor status (InProgress, Completed, Failed). Integrate with GitHub Actions, Jenkins, or Azure DevOps by getting a token via Azure CLI and calling the endpoint in a CI/CD step.

Wrapping Up

The Fabric REST APIs transform Fabric from a portal-driven service into a programmable platform. Anything you click in the portal, you can script with an API. For teams managing dozens of workspaces, hundreds of items, and complex CI/CD workflows, APIs are not optional — they are essential.

Start with the notebook approach (mssparkutils token — zero setup), graduate to service principals for production automation, and integrate with your CI/CD pipeline for fully automated deployments.

Related posts:Git Integration & CI/CDFabric Administration & CostFabric NotebooksFabric Data Factory & Pipelines


Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link