Audit Logging in Data Pipelines: Why It Matters and How to Implement It
You built a pipeline that copies 50 tables every night. It runs, it finishes. But then someone asks: “How many rows were copied? Did Customer actually run? Why is Address missing data?”
Without audit logging, you are digging through the ADF Monitor tab — clicking into each pipeline run, expanding each ForEach iteration, and hoping the run history has not expired (ADF keeps only 45 days by default). With audit logging, you query a SQL table and get answers in seconds — for any date, any table, any pipeline run.
Think of audit logging like the black box in an airplane. The plane (pipeline) can fly perfectly fine without it. But when something goes wrong at 3 AM and the business asks “what happened to yesterday’s data,” the black box has every detail: what was copied, how long it took, what failed, and exactly when. Without it, you are guessing.
Table of Contents
- Why You Need Audit Logging
- Designing the Audit Table
- The Stored Procedure
- Implementing in ADF/Synapse
- Dependency Conditions
- The @activity() Output Properties
- Adding Pipeline Run ID
- Building an Audit Dashboard
- Audit Logging in Databricks and Fabric
- Common Mistakes
- Interview Questions
- Wrapping Up
Why You Need Audit Logging
Scenario 1: Data Completeness
SELECT table_name, rows_copied, load_date
FROM audit WHERE table_name = 'Customer'
ORDER BY load_date DESC;
Instantly see yesterday’s run copied 0 rows — source was locked during maintenance.
Scenario 2: Performance Trending
SELECT table_name, AVG(copy_duration) as avg_seconds, MAX(rows_copied) as max_rows
FROM audit WHERE load_date >= DATEADD(month, -1, GETDATE())
GROUP BY table_name ORDER BY avg_seconds DESC;
Scenario 3: Failure Investigation
SELECT table_name, error_message, load_date
FROM audit WHERE error_message != 'NA'
ORDER BY load_date DESC;
Scenario 4: SLA Compliance
SELECT CAST(load_date AS DATE) as run_date,
MAX(load_date) as last_completed,
CASE WHEN MAX(load_date) <= DATEADD(HOUR, 6, CAST(load_date AS DATE))
THEN 'MET' ELSE 'MISSED' END as sla_status
FROM audit WHERE error_message = 'NA'
GROUP BY CAST(load_date AS DATE) ORDER BY run_date DESC;
Designing the Audit Table
CREATE TABLE audit (
Id INT IDENTITY(1, 1) PRIMARY KEY,
rows_read INT,
rows_copied INT,
copy_duration INT,
table_name VARCHAR(100),
schema_name VARCHAR(100),
error_message VARCHAR(500),
load_date DATETIME
);
Why rows_read and rows_copied are separate: They should match. If they don’t, rows failed during transfer. rows_read=847, rows_copied=800 means 47 rows had issues.
Enhanced audit table for production:
CREATE TABLE audit (
Id INT IDENTITY(1, 1) PRIMARY KEY,
pipeline_name VARCHAR(200), -- which pipeline ran
pipeline_run_id VARCHAR(100), -- unique run identifier
table_name VARCHAR(100), -- which table was copied
schema_name VARCHAR(100), -- source schema
rows_read INT, -- rows read from source
rows_copied INT, -- rows written to sink
rows_skipped INT DEFAULT 0, -- rows that failed validation
copy_duration INT, -- seconds taken
data_read_mb DECIMAL(10,2), -- data volume read (MB)
data_written_mb DECIMAL(10,2), -- data volume written (MB)
status VARCHAR(20), -- 'Success' or 'Failed'
error_message VARCHAR(MAX), -- error details (NULL if success)
source_type VARCHAR(50), -- 'AzureSQL', 'Oracle', 'API'
sink_type VARCHAR(50), -- 'ADLS_Parquet', 'Delta', 'SQL'
load_type VARCHAR(20), -- 'Full' or 'Incremental'
watermark_value VARCHAR(100), -- last watermark for incremental
load_date DATETIME DEFAULT GETDATE(),
INDEX IX_audit_table_date NONCLUSTERED (table_name, load_date DESC),
INDEX IX_audit_status NONCLUSTERED (status, load_date DESC),
INDEX IX_audit_run_id NONCLUSTERED (pipeline_run_id)
);
Why the extra columns matter:
| Column | Why You Need It | Example Query |
|---|---|---|
| pipeline_run_id | Links every audit row to a specific ADF run — cross-reference with Monitor tab | WHERE pipeline_run_id = 'abc-123-def' |
| data_read_mb / data_written_mb | Track data volume trends — catch sudden spikes or drops | WHERE data_read_mb > 1000 (flag large loads) |
| load_type | Distinguish full vs incremental — different expectations for row counts | WHERE load_type = 'Full' AND rows_copied = 0 |
| watermark_value | Debug incremental loads — “what was the watermark for this run?” | WHERE table_name = 'Orders' ORDER BY load_date DESC |
| status | Simple filter instead of checking error_message for NULL or ‘NA’ | WHERE status = 'Failed' AND load_date >= DATEADD(day, -7, GETDATE()) |
Start simple, grow later. The basic 8-column table from above is perfectly fine for your first pipeline. Add columns as your needs grow. The enhanced table is what production systems evolve into over time.
The Stored Procedure
CREATE PROCEDURE sp_insert_audit_log
@rows_read INT, @rows_copied INT, @copy_duration INT,
@table_name VARCHAR(100), @schema_name VARCHAR(100),
@error_message VARCHAR(500)
AS
BEGIN
INSERT INTO audit (rows_read, rows_copied, copy_duration,
table_name, schema_name, error_message, load_date)
VALUES (@rows_read, @rows_copied, @copy_duration,
@table_name, @schema_name, @error_message, GETDATE());
END;
Why a stored procedure? Encapsulation (change once, applies everywhere), security (EXECUTE vs INSERT permission), and reusability (multiple pipelines, same procedure).
Implementing in ADF/Synapse
Inside ForEach, add two Stored Procedure activities after Copy:
Copy_TableData
|-- (Success green arrow) --> Log_Success
|-- (Failure red arrow) --> Log_Failure
Log_Success Parameters
| Parameter | Type | Value |
|---|---|---|
| rows_read | Int32 | @activity('Copy_TableData').output.rowsRead |
| rows_copied | Int32 | @activity('Copy_TableData').output.rowsCopied |
| copy_duration | Int32 | @activity('Copy_TableData').output.copyDuration |
| table_name | String | @item().TableName |
| schema_name | String | @item().SchemaName |
| error_message | String | NA |
Log_Failure Parameters
| Parameter | Type | Value |
|---|---|---|
| rows_read | Int32 | 0 |
| rows_copied | Int32 | 0 |
| copy_duration | Int32 | 0 |
| table_name | String | @item().TableName |
| schema_name | String | @item().SchemaName |
| error_message | String | @concat('Failed: ', item().SchemaName, '.', item().TableName) |
Note: Inside @concat(), don’t use @ on nested functions. It’s item().SchemaName, not @item().SchemaName.
Dependency Conditions
| Condition | Arrow | When Child Runs |
|---|---|---|
| Succeeded | Green | Only on success |
| Failed | Red | Only on failure |
| Completed | Blue | Always |
| Skipped | Gray | Only when skipped |
The @activity() Output Properties
@activity('Copy').output.rowsRead
@activity('Copy').output.rowsCopied
@activity('Copy').output.copyDuration
@activity('Copy').output.throughput
@activity('Copy').output.dataRead
@activity('Copy').output.dataWritten
@activity('Copy').output.errors[0].Message
What each property tells you:
| Property | Type | What It Means | Example Value |
|---|---|---|---|
rowsRead |
Int | Total rows read from source | 84,752 |
rowsCopied |
Int | Total rows written to sink | 84,752 |
copyDuration |
Int | Duration in seconds | 47 |
throughput |
Float | Speed in KBps | 2,048.5 |
dataRead |
Long | Bytes read from source | 12,582,912 |
dataWritten |
Long | Bytes written to sink (after compression) | 3,145,728 |
errors[0].Message |
String | First error message (if any) | “Invalid column name ‘Addr'” |
Data volume trick: Convert bytes to MB in your stored procedure for readable audit records:
data_read_mb: @div(activity('Copy_TableData').output.dataRead, 1048576)
data_written_mb: @div(activity('Copy_TableData').output.dataWritten, 1048576)
Useful pipeline-level expressions for audit:
@pipeline().RunId ← unique ID for this pipeline run
@pipeline().Pipeline ← pipeline name (e.g., "PL_Copy_SqlToADLS")
@pipeline().TriggerName ← which trigger started this run
@pipeline().TriggerTime ← when the trigger fired
@utcnow() ← current UTC timestamp
@pipeline().parameters.ParamName ← any pipeline parameter value
Adding Pipeline Run ID
ALTER TABLE audit ADD pipeline_run_id VARCHAR(100);
Pass from pipeline: @pipeline().RunId
Links every audit row to a specific pipeline run.
Building an Audit Dashboard
The audit table becomes powerful when you build queries and dashboards on top of it. Here are production-ready queries:
Daily Load Summary
-- How did last night's load go?
SELECT
CAST(load_date AS DATE) AS run_date,
COUNT(*) AS total_tables,
SUM(CASE WHEN status = 'Success' THEN 1 ELSE 0 END) AS succeeded,
SUM(CASE WHEN status = 'Failed' THEN 1 ELSE 0 END) AS failed,
SUM(rows_copied) AS total_rows,
MAX(load_date) AS last_completed,
SUM(copy_duration) AS total_seconds
FROM audit
WHERE CAST(load_date AS DATE) = CAST(GETDATE() AS DATE)
GROUP BY CAST(load_date AS DATE);
Missing Tables Detection
-- Which tables did NOT run today? (compares against metadata)
SELECT m.TableName
FROM dbo.metadata m
LEFT JOIN audit a
ON m.TableName = a.table_name
AND CAST(a.load_date AS DATE) = CAST(GETDATE() AS DATE)
WHERE a.table_name IS NULL;
Row Count Anomaly Detection
-- Flag tables where today's row count is <50% or >200% of the 7-day average
WITH avg_rows AS (
SELECT table_name, AVG(CAST(rows_copied AS FLOAT)) AS avg_7day
FROM audit
WHERE status = 'Success'
AND load_date >= DATEADD(day, -7, GETDATE())
GROUP BY table_name
),
today AS (
SELECT table_name, rows_copied
FROM audit
WHERE status = 'Success'
AND CAST(load_date AS DATE) = CAST(GETDATE() AS DATE)
)
SELECT t.table_name, t.rows_copied AS today_rows,
CAST(a.avg_7day AS INT) AS avg_7day_rows,
CASE
WHEN t.rows_copied < a.avg_7day * 0.5 THEN 'ANOMALY: Too few rows'
WHEN t.rows_copied > a.avg_7day * 2.0 THEN 'ANOMALY: Too many rows'
ELSE 'Normal'
END AS alert
FROM today t
JOIN avg_rows a ON t.table_name = a.table_name
WHERE t.rows_copied < a.avg_7day * 0.5
OR t.rows_copied > a.avg_7day * 2.0;
Performance Degradation Alert
-- Tables that took 3x longer than their average
SELECT table_name, copy_duration AS today_seconds,
(SELECT AVG(copy_duration) FROM audit a2
WHERE a2.table_name = a.table_name
AND a2.load_date >= DATEADD(day, -30, GETDATE())
AND a2.status = 'Success') AS avg_30day_seconds
FROM audit a
WHERE CAST(load_date AS DATE) = CAST(GETDATE() AS DATE)
AND status = 'Success'
AND copy_duration > 3 * (
SELECT AVG(copy_duration) FROM audit a2
WHERE a2.table_name = a.table_name
AND a2.load_date >= DATEADD(day, -30, GETDATE())
AND a2.status = 'Success'
);
Dashboard options: Connect Power BI directly to the audit table for a visual monitoring dashboard. Or schedule these queries as SQL Agent jobs that send email alerts when anomalies are detected.
Audit Logging in Databricks and Fabric
Audit logging is not just an ADF concept — every platform needs it. Here is how the pattern translates:
Databricks (Delta Table Audit)
import datetime
def log_audit(table_name, rows_read, rows_written, status, error_msg=None):
audit_record = [{
"pipeline_name": "bronze_to_silver",
"table_name": table_name,
"rows_read": rows_read,
"rows_written": rows_written,
"status": status,
"error_message": error_msg or "NA",
"load_date": datetime.datetime.utcnow().isoformat()
}]
df = spark.createDataFrame(audit_record)
df.write.mode("append").format("delta").saveAsTable("audit.pipeline_log")
# Usage in a notebook
try:
df = spark.read.parquet("abfss://raw@lake.dfs.core.windows.net/customers/")
cleaned = df.filter(df.CustomerID.isNotNull()).dropDuplicates()
cleaned.write.mode("overwrite").format("delta").saveAsTable("silver.customers")
log_audit("customers", df.count(), cleaned.count(), "Success")
except Exception as e:
log_audit("customers", 0, 0, "Failed", str(e))
raise # re-raise so the job fails visibly
Microsoft Fabric (Notebook Audit)
# Same pattern — Fabric notebooks use the same PySpark API
# Write audit to a Lakehouse Delta table instead of SQL
audit_df = spark.createDataFrame([{
"table_name": "customers",
"rows_written": cleaned.count(),
"status": "Success",
"load_date": datetime.datetime.utcnow().isoformat()
}])
audit_df.write.mode("append").format("delta").saveAsTable("audit_log")
# Query audit from SQL analytics endpoint:
# SELECT * FROM audit_log WHERE status = 'Failed' ORDER BY load_date DESC
The pattern is identical across all platforms: wrap your transformation in try/except (or Success/Failure dependency), capture row counts and timing, write to a persistent audit store. The only difference is the destination — SQL table (ADF/Synapse), Delta table (Databricks/Fabric).
Common Mistakes
- Completed instead of Succeeded/Failed — both handlers run every time
- Missing failure handler — Copy failure stops entire ForEach
- Nested @ in expressions —
@concat(@item()...)is wrong - Wrong parameter types — rowsRead needs Int32, not String
- Not testing failure path — insert a fake table to verify error logging
-
Not indexing the audit table — as the audit table grows to millions of rows (50 tables × 365 days = 18K+ rows/year), queries without indexes become slow. Add indexes on
(table_name, load_date)and(status, load_date)at minimum. -
Logging only failures — if you only log errors, you cannot answer “how many rows were copied yesterday” or “what was the average duration last month.” Always log both success AND failure for complete visibility.
-
Not including pipeline_run_id — without it, you cannot distinguish between a scheduled run and a manual rerun on the same day. Always include
@pipeline().RunIdto tie audit records to specific ADF runs.
Interview Questions
Q: How do you implement audit logging in ADF? A: Stored Procedure activities after Copy — green arrow for success (logs metrics), red arrow for failure (logs errors). Both call the same stored procedure with different parameters.
Q: What’s the difference between green, red, and blue arrows? A: Green = Succeeded only. Red = Failed only. Blue = Completed always. Use green for success logging, red for failure logging.
Q: Why stored procedure instead of direct SQL? A: Encapsulation, security (EXECUTE permission), reusability, and simpler configuration.
Q: How do you detect data anomalies using audit logs? A: Compare today’s row count against the 7-day or 30-day average for each table. If today’s count is less than 50% or greater than 200% of the average, flag it as an anomaly. Similarly, compare copy duration to detect performance degradation. These queries can run as scheduled alerts.
Q: How do you handle audit logging in Databricks? A: Wrap transformations in try/except. On success, write a record to a Delta audit table with row counts, duration, and status. On failure, write the error message. Query the audit table using SQL or Delta time travel. Same pattern as ADF stored procedures, but using PySpark and Delta Lake instead of SQL Server.
Q: Why log both rows_read and rows_copied separately? A: They should match for most loads. If rows_read is 10,000 but rows_copied is 9,500, it means 500 rows failed during transfer — possibly due to data type mismatches, null constraint violations, or sink schema issues. The discrepancy itself is a data quality signal.
Q: How do you prevent the audit table from growing too large? A: Implement a retention policy. Keep detailed records for 90 days, aggregate older data into a summary table (daily totals by table), and delete or archive detail rows older than 90 days. Use a SQL Agent job or ADF pipeline to automate this cleanup weekly.
Wrapping Up
Audit logging transforms your pipeline from a black box into a transparent system. Start simple — table name, rows copied, duration, errors. Grow from there.
Related posts: – Synapse Pipeline with Audit Logging (full implementation) – Metadata-Driven Pipeline in ADF – Common ADF/Synapse Errors
Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.