Audit Logging in Data Pipelines: Why It Matters and How to Implement It

Audit Logging in Data Pipelines: Why It Matters and How to Implement It

You built a pipeline that copies 50 tables every night. It runs, it finishes. But then someone asks: “How many rows were copied? Did Customer actually run? Why is Address missing data?”

Without audit logging, you’re digging through the Monitor tab. With it, you query a table and get answers in seconds.

Why You Need Audit Logging

Scenario 1: Data Completeness

SELECT table_name, rows_copied, load_date
FROM audit WHERE table_name = 'Customer'
ORDER BY load_date DESC;

Instantly see yesterday’s run copied 0 rows — source was locked during maintenance.

SELECT table_name, AVG(copy_duration) as avg_seconds, MAX(rows_copied) as max_rows
FROM audit WHERE load_date >= DATEADD(month, -1, GETDATE())
GROUP BY table_name ORDER BY avg_seconds DESC;

Scenario 3: Failure Investigation

SELECT table_name, error_message, load_date
FROM audit WHERE error_message != 'NA'
ORDER BY load_date DESC;

Scenario 4: SLA Compliance

SELECT CAST(load_date AS DATE) as run_date,
       MAX(load_date) as last_completed,
       CASE WHEN MAX(load_date) <= DATEADD(HOUR, 6, CAST(load_date AS DATE))
            THEN 'MET' ELSE 'MISSED' END as sla_status
FROM audit WHERE error_message = 'NA'
GROUP BY CAST(load_date AS DATE) ORDER BY run_date DESC;

Designing the Audit Table

CREATE TABLE audit (
    Id              INT IDENTITY(1, 1) PRIMARY KEY,
    rows_read       INT,
    rows_copied     INT,
    copy_duration   INT,
    table_name      VARCHAR(100),
    schema_name     VARCHAR(100),
    error_message   VARCHAR(500),
    load_date       DATETIME
);

Why rows_read and rows_copied are separate: They should match. If they don’t, rows failed during transfer. rows_read=847, rows_copied=800 means 47 rows had issues.

The Stored Procedure

CREATE PROCEDURE sp_insert_audit_log
    @rows_read INT, @rows_copied INT, @copy_duration INT,
    @table_name VARCHAR(100), @schema_name VARCHAR(100),
    @error_message VARCHAR(500)
AS
BEGIN
    INSERT INTO audit (rows_read, rows_copied, copy_duration,
                       table_name, schema_name, error_message, load_date)
    VALUES (@rows_read, @rows_copied, @copy_duration,
            @table_name, @schema_name, @error_message, GETDATE());
END;

Why a stored procedure? Encapsulation (change once, applies everywhere), security (EXECUTE vs INSERT permission), and reusability (multiple pipelines, same procedure).

Implementing in ADF/Synapse

Inside ForEach, add two Stored Procedure activities after Copy:

Copy_TableData
    |-- (Success green arrow) --> Log_Success
    |-- (Failure red arrow)   --> Log_Failure

Log_Success Parameters

Parameter Type Value
rows_read Int32 @activity('Copy_TableData').output.rowsRead
rows_copied Int32 @activity('Copy_TableData').output.rowsCopied
copy_duration Int32 @activity('Copy_TableData').output.copyDuration
table_name String @item().TableName
schema_name String @item().SchemaName
error_message String NA

Log_Failure Parameters

Parameter Type Value
rows_read Int32 0
rows_copied Int32 0
copy_duration Int32 0
table_name String @item().TableName
schema_name String @item().SchemaName
error_message String @concat('Failed: ', item().SchemaName, '.', item().TableName)

Note: Inside @concat(), don’t use @ on nested functions. It’s item().SchemaName, not @item().SchemaName.

Dependency Conditions

Condition Arrow When Child Runs
Succeeded Green Only on success
Failed Red Only on failure
Completed Blue Always
Skipped Gray Only when skipped

The @activity() Output Properties

@activity('Copy').output.rowsRead
@activity('Copy').output.rowsCopied
@activity('Copy').output.copyDuration
@activity('Copy').output.throughput
@activity('Copy').output.dataRead
@activity('Copy').output.dataWritten
@activity('Copy').output.errors[0].Message

Adding Pipeline Run ID

ALTER TABLE audit ADD pipeline_run_id VARCHAR(100);

Pass from pipeline: @pipeline().RunId

Links every audit row to a specific pipeline run.

Common Mistakes

  1. Completed instead of Succeeded/Failed — both handlers run every time
  2. Missing failure handler — Copy failure stops entire ForEach
  3. Nested @ in expressions@concat(@item()...) is wrong
  4. Wrong parameter types — rowsRead needs Int32, not String
  5. Not testing failure path — insert a fake table to verify error logging

Interview Questions

Q: How do you implement audit logging in ADF? A: Stored Procedure activities after Copy — green arrow for success (logs metrics), red arrow for failure (logs errors). Both call the same stored procedure with different parameters.

Q: What’s the difference between green, red, and blue arrows? A: Green = Succeeded only. Red = Failed only. Blue = Completed always. Use green for success logging, red for failure logging.

Q: Why stored procedure instead of direct SQL? A: Encapsulation, security (EXECUTE permission), reusability, and simpler configuration.

Wrapping Up

Audit logging transforms your pipeline from a black box into a transparent system. Start simple — table name, rows copied, duration, errors. Grow from there.

Related posts:Synapse Pipeline with Audit Logging (full implementation)Metadata-Driven Pipeline in ADFCommon ADF/Synapse Errors


Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link