Fabric REST APIs: Programmatic Management, Automating Workspace Setup, Triggering Pipelines, and Building Admin Scripts in Python
You have been managing Fabric through the portal — clicking buttons to create workspaces, running pipelines manually, checking monitoring hub for failures. That works for 5 items. It does not work for 50.
What if you need to create 10 workspaces for 10 project teams? What if you need to trigger a pipeline from an external system (Jenkins, GitHub Actions, a custom app)? What if you need to export a list of all items across all workspaces for an audit? What if you need to programmatically assign workspace roles when a new team member joins?
That is where the Fabric REST APIs come in — programmatic control over everything you can do in the portal, and some things you cannot.
Think of the Fabric portal as driving a car manually — steering, braking, accelerating one action at a time. The REST API is autopilot — you program the route (write a script), press start, and the car drives itself. For repetitive tasks (create 10 workspaces, trigger 20 pipelines, audit 100 items), autopilot is not just faster — it is the only practical option.
Table of Contents
- What Are Fabric REST APIs?
- Authentication: Getting an Access Token
- Using Azure AD App Registration
- Using Azure CLI
- Using mssparkutils (from Notebooks)
- Core API Endpoints
- Workspace Management
- Item Management (Lakehouse, Notebook, Pipeline)
- Pipeline Execution
- Lakehouse Table Management
- Capacity Management
- Real-World Script 1: Create Workspace and Items
- Real-World Script 2: Trigger Pipeline from External System
- Real-World Script 3: Audit All Items Across Workspaces
- Real-World Script 4: Bulk Assign Workspace Roles
- Real-World Script 5: Monitor Pipeline Runs
- Real-World Script 6: Export Lakehouse Table List
- Using APIs from Fabric Notebooks
- Integrating with CI/CD (GitHub Actions / Azure DevOps)
- Triggering Deployment Pipelines via API
- Rate Limits and Best Practices
- The Fabric Python SDK (microsoft-fabric)
- Common Mistakes
- Interview Questions
- Wrapping Up
What Are Fabric REST APIs?
Fabric REST APIs let you manage Fabric resources programmatically using HTTP requests. Every action in the portal — creating a workspace, running a pipeline, listing items — has a corresponding API endpoint.
Portal (manual):
Click "New workspace" → type name → click Create
API (programmatic):
POST https://api.fabric.microsoft.com/v1/workspaces
Body: {"displayName": "DataEng_Prod", "capacityId": "guid"}
Same result. But scriptable, repeatable, automatable.
Base URL: https://api.fabric.microsoft.com/v1/
Authentication: Getting an Access Token
Every API call requires a Bearer token in the Authorization header.
Using Azure AD App Registration (Production)
# Step 1: Register an app in Azure AD (Entra ID)
# Step 2: Grant it Fabric API permissions
# Step 3: Get a token programmatically
from azure.identity import ClientSecretCredential
tenant_id = "your-tenant-id"
client_id = "your-app-client-id"
client_secret = "your-app-client-secret"
credential = ClientSecretCredential(tenant_id, client_id, client_secret)
token = credential.get_token("https://api.fabric.microsoft.com/.default")
headers = {
"Authorization": f"Bearer {token.token}",
"Content-Type": "application/json"
}
Using Azure CLI (Quick Testing)
# Login
az login
# Get token
TOKEN=$(az account get-access-token --resource https://api.fabric.microsoft.com --query accessToken -o tsv)
# Use in curl
curl -H "Authorization: Bearer $TOKEN" https://api.fabric.microsoft.com/v1/workspaces
Using mssparkutils (from Notebooks)
# Inside a Fabric notebook — easiest method
token = mssparkutils.credentials.getToken("https://api.fabric.microsoft.com")
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
Core API Endpoints
Workspace Management
import requests
BASE_URL = "https://api.fabric.microsoft.com/v1"
# List all workspaces
response = requests.get(f"{BASE_URL}/workspaces", headers=headers)
workspaces = response.json()["value"]
for ws in workspaces:
print(f" {ws['displayName']} (ID: {ws['id']})")
# Create a workspace
payload = {
"displayName": "DataEng_NewProject",
"capacityId": "your-capacity-guid",
"description": "Workspace for new data engineering project"
}
response = requests.post(f"{BASE_URL}/workspaces", headers=headers, json=payload)
new_ws = response.json()
print(f"Created: {new_ws['displayName']} (ID: {new_ws['id']})")
# Delete a workspace
requests.delete(f"{BASE_URL}/workspaces/{workspace_id}", headers=headers)
# Add a user to workspace
payload = {
"identifier": "engineer@company.com",
"groupUserAccessRight": "Member" # Admin, Member, Contributor, Viewer
}
requests.post(f"{BASE_URL}/workspaces/{workspace_id}/roleAssignments",
headers=headers, json=payload)
Item Management (Lakehouse, Notebook, Pipeline)
# List all items in a workspace
response = requests.get(f"{BASE_URL}/workspaces/{workspace_id}/items", headers=headers)
items = response.json()["value"]
for item in items:
print(f" {item['type']:20s} | {item['displayName']}")
# Output:
# Lakehouse | bronze_lakehouse
# Lakehouse | silver_lakehouse
# Warehouse | gold_warehouse
# Notebook | NB_Clean_Customers
# DataPipeline | PL_Daily_ETL
# SemanticModel | Sales Analytics
# Create a Lakehouse
payload = {"displayName": "test_lakehouse", "type": "Lakehouse"}
response = requests.post(f"{BASE_URL}/workspaces/{workspace_id}/items",
headers=headers, json=payload)
# Delete an item
requests.delete(f"{BASE_URL}/workspaces/{workspace_id}/items/{item_id}", headers=headers)
Pipeline Execution
# Trigger a pipeline run
pipeline_id = "your-pipeline-guid"
response = requests.post(
f"{BASE_URL}/workspaces/{workspace_id}/items/{pipeline_id}/jobs/instances?jobType=Pipeline",
headers=headers
)
run_id = response.headers.get("Location", "").split("/")[-1]
print(f"Pipeline started. Run ID: {run_id}")
# Check pipeline run status
response = requests.get(
f"{BASE_URL}/workspaces/{workspace_id}/items/{pipeline_id}/jobs/instances/{run_id}",
headers=headers
)
status = response.json()
print(f"Status: {status['status']}") # InProgress, Completed, Failed
Lakehouse Table Management
# List tables in a Lakehouse
response = requests.get(
f"{BASE_URL}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/tables",
headers=headers
)
tables = response.json()["data"]
for table in tables:
print(f" {table['name']:30s} | Format: {table['format']} | Location: {table['location']}")
# Load a file into a Lakehouse table
payload = {
"relativePath": "Files/raw_csv/customers.csv",
"pathType": "File",
"mode": "Overwrite",
"recursive": False,
"formatOptions": {
"format": "Csv",
"header": True,
"delimiter": ","
}
}
requests.post(
f"{BASE_URL}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/tables/customers/load",
headers=headers, json=payload
)
Capacity Management
# List capacities
response = requests.get(f"{BASE_URL}/capacities", headers=headers)
for cap in response.json()["value"]:
print(f" {cap['displayName']} | SKU: {cap['sku']} | State: {cap['state']}")
Real-World Script 1: Create Workspace and Items
import requests
import time
def create_project_workspace(project_name, capacity_id, team_members, headers):
'''Create a complete project workspace with standard items.'''
BASE_URL = "https://api.fabric.microsoft.com/v1"
# Step 1: Create workspace
ws_payload = {
"displayName": f"DE_{project_name}",
"capacityId": capacity_id,
"description": f"Data engineering workspace for {project_name}"
}
ws_response = requests.post(f"{BASE_URL}/workspaces", headers=headers, json=ws_payload)
workspace_id = ws_response.json()["id"]
print(f"Created workspace: DE_{project_name} ({workspace_id})")
# Step 2: Create standard Lakehouses
for lh_name in ["bronze_lakehouse", "silver_lakehouse"]:
requests.post(f"{BASE_URL}/workspaces/{workspace_id}/items",
headers=headers, json={"displayName": lh_name, "type": "Lakehouse"})
print(f" Created: {lh_name}")
# Step 3: Create Warehouse
requests.post(f"{BASE_URL}/workspaces/{workspace_id}/items",
headers=headers, json={"displayName": "gold_warehouse", "type": "Warehouse"})
print(f" Created: gold_warehouse")
# Step 4: Add team members
for email, role in team_members.items():
payload = {"identifier": email, "groupUserAccessRight": role}
requests.post(f"{BASE_URL}/workspaces/{workspace_id}/roleAssignments",
headers=headers, json=payload)
print(f" Added: {email} as {role}")
return workspace_id
# Usage
team = {
"naveen@company.com": "Admin",
"shrey@company.com": "Member",
"vrushab@company.com": "Member",
"analyst@company.com": "Viewer"
}
ws_id = create_project_workspace("SalesAnalytics", "capacity-guid", team, headers)
Real-World Script 2: Trigger Pipeline from External System
import requests
import time
def trigger_and_monitor_pipeline(workspace_id, pipeline_id, headers, timeout_minutes=60):
'''Trigger a Fabric pipeline and wait for completion.'''
BASE_URL = "https://api.fabric.microsoft.com/v1"
# Trigger
response = requests.post(
f"{BASE_URL}/workspaces/{workspace_id}/items/{pipeline_id}/jobs/instances?jobType=Pipeline",
headers=headers
)
if response.status_code not in [200, 202]:
raise Exception(f"Failed to trigger: {response.status_code} {response.text}")
location = response.headers.get("Location", "")
print(f"Pipeline triggered. Monitoring...")
# Poll for completion
start_time = time.time()
while True:
status_response = requests.get(location, headers=headers)
status = status_response.json().get("status", "Unknown")
if status == "Completed":
print(f"Pipeline completed successfully!")
return True
elif status == "Failed":
print(f"Pipeline FAILED!")
return False
elif time.time() - start_time > timeout_minutes * 60:
print(f"Pipeline timed out after {timeout_minutes} minutes")
return False
print(f" Status: {status}... waiting 30 seconds")
time.sleep(30)
# Call from Jenkins, GitHub Actions, or any external system
success = trigger_and_monitor_pipeline(workspace_id, pipeline_id, headers)
Real-World Script 3: Audit All Items Across Workspaces
import requests
import csv
def audit_all_fabric_items(headers, output_file="fabric_audit.csv"):
'''Export a complete inventory of all Fabric items across all workspaces.'''
BASE_URL = "https://api.fabric.microsoft.com/v1"
# Get all workspaces
ws_response = requests.get(f"{BASE_URL}/workspaces", headers=headers)
workspaces = ws_response.json()["value"]
all_items = []
for ws in workspaces:
items_response = requests.get(
f"{BASE_URL}/workspaces/{ws['id']}/items", headers=headers
)
if items_response.status_code == 200:
for item in items_response.json().get("value", []):
all_items.append({
"workspace": ws["displayName"],
"workspace_id": ws["id"],
"item_name": item["displayName"],
"item_type": item["type"],
"item_id": item["id"],
"description": item.get("description", "")
})
# Write to CSV
with open(output_file, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=all_items[0].keys())
writer.writeheader()
writer.writerows(all_items)
print(f"Audit complete: {len(all_items)} items across {len(workspaces)} workspaces")
print(f"Exported to: {output_file}")
# Summary
from collections import Counter
type_counts = Counter(item["item_type"] for item in all_items)
print(f"
Item counts by type:")
for item_type, count in type_counts.most_common():
print(f" {item_type:25s}: {count}")
audit_all_fabric_items(headers)
Real-World Script 4: Bulk Assign Workspace Roles
def bulk_assign_roles(workspace_id, assignments, headers):
'''Assign roles to multiple users at once.'''
BASE_URL = "https://api.fabric.microsoft.com/v1"
for email, role in assignments.items():
payload = {"identifier": email, "groupUserAccessRight": role}
response = requests.post(
f"{BASE_URL}/workspaces/{workspace_id}/roleAssignments",
headers=headers, json=payload
)
status = "✅" if response.status_code in [200, 201] else "❌"
print(f" {status} {email}: {role}")
# Onboard a new team
new_team = {
"alice@company.com": "Member",
"bob@company.com": "Member",
"carol@company.com": "Contributor",
"david@company.com": "Viewer",
"eve@company.com": "Viewer",
}
bulk_assign_roles("workspace-guid", new_team, headers)
Real-World Script 5: Monitor Pipeline Runs
def get_recent_pipeline_failures(workspace_id, headers, hours=24):
'''Find failed pipeline runs in the last N hours.'''
BASE_URL = "https://api.fabric.microsoft.com/v1"
# List all pipelines
items = requests.get(f"{BASE_URL}/workspaces/{workspace_id}/items",
headers=headers).json()["value"]
pipelines = [i for i in items if i["type"] == "DataPipeline"]
print(f"Checking {len(pipelines)} pipelines for failures in last {hours} hours...")
for pipeline in pipelines:
runs = requests.get(
f"{BASE_URL}/workspaces/{workspace_id}/items/{pipeline['id']}/jobs/instances",
headers=headers
)
if runs.status_code == 200:
for run in runs.json().get("value", []):
if run.get("status") == "Failed":
print(f" ❌ {pipeline['displayName']}")
print(f" Run ID: {run['id']}")
print(f" Start: {run.get('startTimeUtc', 'N/A')}")
print(f" Error: {run.get('failureReason', 'N/A')}")
get_recent_pipeline_failures("workspace-guid", headers)
Using APIs from Fabric Notebooks
# Inside a Fabric notebook — simplest authentication
import requests
import json
token = mssparkutils.credentials.getToken("https://api.fabric.microsoft.com")
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
BASE_URL = "https://api.fabric.microsoft.com/v1"
# Get current workspace info
workspace_id = mssparkutils.env.getWorkspaceId()
# List all tables in the current lakehouse
lakehouse_id = mssparkutils.env.getLakehouseId()
response = requests.get(
f"{BASE_URL}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/tables",
headers=headers
)
tables = response.json()["data"]
print(f"Tables in current lakehouse:")
for t in tables:
print(f" {t['name']}")
Integrating with CI/CD (GitHub Actions)
# .github/workflows/deploy-fabric.yml
name: Deploy to Fabric
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Azure Login
uses: azure/login@v1
with:
creds: ${{ secrets.AZURE_CREDENTIALS }}
- name: Get Fabric Token
run: |
TOKEN=$(az account get-access-token --resource https://api.fabric.microsoft.com --query accessToken -o tsv)
echo "FABRIC_TOKEN=$TOKEN" >> $GITHUB_ENV
- name: Trigger Deployment Pipeline
run: |
curl -X POST "https://api.fabric.microsoft.com/v1/deploymentPipelines/${{ vars.PIPELINE_ID }}/deploy" -H "Authorization: Bearer $FABRIC_TOKEN" -H "Content-Type: application/json" -d '{"sourceStageOrder": 0, "targetStageOrder": 1}'
Rate Limits and Best Practices
| Practice | Why |
|---|---|
| Add delays between calls | time.sleep(1) between API calls avoids rate limiting |
| Handle 429 responses | Retry after the Retry-After header value |
| Use pagination | Large result sets return paginated — follow continuationUri |
| Cache tokens | Tokens last ~60 minutes — do not request a new one per call |
| Use service principals | For production automation, not personal accounts |
| Log all API calls | For audit trail and debugging |
Common Mistakes
-
Using personal tokens in production — personal tokens expire when the person’s session ends. Use Azure AD app registrations (service principals) for automated scripts.
-
Not handling pagination — listing items in large workspaces returns paginated results. Always check for
continuationUriin the response and follow it. -
Ignoring rate limits — rapid-fire API calls get 429 (Too Many Requests). Add
time.sleep(1)between calls and implement retry logic with exponential backoff. -
Hardcoding IDs — workspace IDs and item IDs change across environments. Use the List endpoints to discover IDs by name, then use the IDs in subsequent calls.
-
Not checking response status codes — always verify
response.status_codebefore usingresponse.json(). A 403 means permission denied, 404 means item not found.
Interview Questions
Q: What can you do with Fabric REST APIs? A: Programmatically manage all Fabric resources: create/delete workspaces, list/create items (lakehouses, notebooks, pipelines), trigger pipeline runs and monitor their status, assign workspace roles, manage capacity, and trigger deployment pipelines. APIs enable automation that is not practical through the portal — bulk operations, CI/CD integration, external system triggers, and audit scripts.
Q: How do you authenticate with the Fabric REST API? A: Three methods: Azure AD app registration (service principal) for production automation, Azure CLI for quick testing, or mssparkutils.credentials.getToken() from within Fabric notebooks. All methods produce a Bearer token passed in the Authorization header. Service principals are recommended for production.
Q: How do you trigger a Fabric pipeline from an external system? A: POST to the pipeline jobs endpoint with a Bearer token. The API returns a Location header with the run URL. Poll that URL to monitor status (InProgress, Completed, Failed). Integrate with GitHub Actions, Jenkins, or Azure DevOps by getting a token via Azure CLI and calling the endpoint in a CI/CD step.
Wrapping Up
The Fabric REST APIs transform Fabric from a portal-driven service into a programmable platform. Anything you click in the portal, you can script with an API. For teams managing dozens of workspaces, hundreds of items, and complex CI/CD workflows, APIs are not optional — they are essential.
Start with the notebook approach (mssparkutils token — zero setup), graduate to service principals for production automation, and integrate with your CI/CD pipeline for fully automated deployments.
Related posts: – Git Integration & CI/CD – Fabric Administration & Cost – Fabric Notebooks – Fabric Data Factory & Pipelines
Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.