Audit Logging in Data Pipelines: Why It Matters and How to Implement It
You built a pipeline that copies 50 tables every night. It runs, it finishes. But then someone asks: “How many rows were copied? Did Customer actually run? Why is Address missing data?”
Without audit logging, you’re digging through the Monitor tab. With it, you query a table and get answers in seconds.
Why You Need Audit Logging
Scenario 1: Data Completeness
SELECT table_name, rows_copied, load_date
FROM audit WHERE table_name = 'Customer'
ORDER BY load_date DESC;
Instantly see yesterday’s run copied 0 rows — source was locked during maintenance.
Scenario 2: Performance Trending
SELECT table_name, AVG(copy_duration) as avg_seconds, MAX(rows_copied) as max_rows
FROM audit WHERE load_date >= DATEADD(month, -1, GETDATE())
GROUP BY table_name ORDER BY avg_seconds DESC;
Scenario 3: Failure Investigation
SELECT table_name, error_message, load_date
FROM audit WHERE error_message != 'NA'
ORDER BY load_date DESC;
Scenario 4: SLA Compliance
SELECT CAST(load_date AS DATE) as run_date,
MAX(load_date) as last_completed,
CASE WHEN MAX(load_date) <= DATEADD(HOUR, 6, CAST(load_date AS DATE))
THEN 'MET' ELSE 'MISSED' END as sla_status
FROM audit WHERE error_message = 'NA'
GROUP BY CAST(load_date AS DATE) ORDER BY run_date DESC;
Designing the Audit Table
CREATE TABLE audit (
Id INT IDENTITY(1, 1) PRIMARY KEY,
rows_read INT,
rows_copied INT,
copy_duration INT,
table_name VARCHAR(100),
schema_name VARCHAR(100),
error_message VARCHAR(500),
load_date DATETIME
);
Why rows_read and rows_copied are separate: They should match. If they don’t, rows failed during transfer. rows_read=847, rows_copied=800 means 47 rows had issues.
The Stored Procedure
CREATE PROCEDURE sp_insert_audit_log
@rows_read INT, @rows_copied INT, @copy_duration INT,
@table_name VARCHAR(100), @schema_name VARCHAR(100),
@error_message VARCHAR(500)
AS
BEGIN
INSERT INTO audit (rows_read, rows_copied, copy_duration,
table_name, schema_name, error_message, load_date)
VALUES (@rows_read, @rows_copied, @copy_duration,
@table_name, @schema_name, @error_message, GETDATE());
END;
Why a stored procedure? Encapsulation (change once, applies everywhere), security (EXECUTE vs INSERT permission), and reusability (multiple pipelines, same procedure).
Implementing in ADF/Synapse
Inside ForEach, add two Stored Procedure activities after Copy:
Copy_TableData
|-- (Success green arrow) --> Log_Success
|-- (Failure red arrow) --> Log_Failure
Log_Success Parameters
| Parameter | Type | Value |
|---|---|---|
| rows_read | Int32 | @activity('Copy_TableData').output.rowsRead |
| rows_copied | Int32 | @activity('Copy_TableData').output.rowsCopied |
| copy_duration | Int32 | @activity('Copy_TableData').output.copyDuration |
| table_name | String | @item().TableName |
| schema_name | String | @item().SchemaName |
| error_message | String | NA |
Log_Failure Parameters
| Parameter | Type | Value |
|---|---|---|
| rows_read | Int32 | 0 |
| rows_copied | Int32 | 0 |
| copy_duration | Int32 | 0 |
| table_name | String | @item().TableName |
| schema_name | String | @item().SchemaName |
| error_message | String | @concat('Failed: ', item().SchemaName, '.', item().TableName) |
Note: Inside @concat(), don’t use @ on nested functions. It’s item().SchemaName, not @item().SchemaName.
Dependency Conditions
| Condition | Arrow | When Child Runs |
|---|---|---|
| Succeeded | Green | Only on success |
| Failed | Red | Only on failure |
| Completed | Blue | Always |
| Skipped | Gray | Only when skipped |
The @activity() Output Properties
@activity('Copy').output.rowsRead
@activity('Copy').output.rowsCopied
@activity('Copy').output.copyDuration
@activity('Copy').output.throughput
@activity('Copy').output.dataRead
@activity('Copy').output.dataWritten
@activity('Copy').output.errors[0].Message
Adding Pipeline Run ID
ALTER TABLE audit ADD pipeline_run_id VARCHAR(100);
Pass from pipeline: @pipeline().RunId
Links every audit row to a specific pipeline run.
Common Mistakes
- Completed instead of Succeeded/Failed — both handlers run every time
- Missing failure handler — Copy failure stops entire ForEach
- Nested @ in expressions —
@concat(@item()...)is wrong - Wrong parameter types — rowsRead needs Int32, not String
- Not testing failure path — insert a fake table to verify error logging
Interview Questions
Q: How do you implement audit logging in ADF? A: Stored Procedure activities after Copy — green arrow for success (logs metrics), red arrow for failure (logs errors). Both call the same stored procedure with different parameters.
Q: What’s the difference between green, red, and blue arrows? A: Green = Succeeded only. Red = Failed only. Blue = Completed always. Use green for success logging, red for failure logging.
Q: Why stored procedure instead of direct SQL? A: Encapsulation, security (EXECUTE permission), reusability, and simpler configuration.
Wrapping Up
Audit logging transforms your pipeline from a black box into a transparent system. Start simple — table name, rows copied, duration, errors. Grow from there.
Related posts: – Synapse Pipeline with Audit Logging (full implementation) – Metadata-Driven Pipeline in ADF – Common ADF/Synapse Errors
Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.