Audit Logging in Data Pipelines: Why It Matters and How to Implement It

Audit Logging in Data Pipelines: Why It Matters and How to Implement It

You built a pipeline that copies 50 tables every night. It runs, it finishes. But then someone asks: “How many rows were copied? Did Customer actually run? Why is Address missing data?”

Without audit logging, you are digging through the ADF Monitor tab — clicking into each pipeline run, expanding each ForEach iteration, and hoping the run history has not expired (ADF keeps only 45 days by default). With audit logging, you query a SQL table and get answers in seconds — for any date, any table, any pipeline run.

Think of audit logging like the black box in an airplane. The plane (pipeline) can fly perfectly fine without it. But when something goes wrong at 3 AM and the business asks “what happened to yesterday’s data,” the black box has every detail: what was copied, how long it took, what failed, and exactly when. Without it, you are guessing.

Table of Contents

  • Why You Need Audit Logging
  • Designing the Audit Table
  • The Stored Procedure
  • Implementing in ADF/Synapse
  • Dependency Conditions
  • The @activity() Output Properties
  • Adding Pipeline Run ID
  • Building an Audit Dashboard
  • Audit Logging in Databricks and Fabric
  • Common Mistakes
  • Interview Questions
  • Wrapping Up

Why You Need Audit Logging

Scenario 1: Data Completeness

SELECT table_name, rows_copied, load_date
FROM audit WHERE table_name = 'Customer'
ORDER BY load_date DESC;

Instantly see yesterday’s run copied 0 rows — source was locked during maintenance.

SELECT table_name, AVG(copy_duration) as avg_seconds, MAX(rows_copied) as max_rows
FROM audit WHERE load_date >= DATEADD(month, -1, GETDATE())
GROUP BY table_name ORDER BY avg_seconds DESC;

Scenario 3: Failure Investigation

SELECT table_name, error_message, load_date
FROM audit WHERE error_message != 'NA'
ORDER BY load_date DESC;

Scenario 4: SLA Compliance

SELECT CAST(load_date AS DATE) as run_date,
       MAX(load_date) as last_completed,
       CASE WHEN MAX(load_date) <= DATEADD(HOUR, 6, CAST(load_date AS DATE))
            THEN 'MET' ELSE 'MISSED' END as sla_status
FROM audit WHERE error_message = 'NA'
GROUP BY CAST(load_date AS DATE) ORDER BY run_date DESC;

Designing the Audit Table

CREATE TABLE audit (
    Id              INT IDENTITY(1, 1) PRIMARY KEY,
    rows_read       INT,
    rows_copied     INT,
    copy_duration   INT,
    table_name      VARCHAR(100),
    schema_name     VARCHAR(100),
    error_message   VARCHAR(500),
    load_date       DATETIME
);

Why rows_read and rows_copied are separate: They should match. If they don’t, rows failed during transfer. rows_read=847, rows_copied=800 means 47 rows had issues.

Enhanced audit table for production:

CREATE TABLE audit (
    Id                INT IDENTITY(1, 1) PRIMARY KEY,
    pipeline_name     VARCHAR(200),        -- which pipeline ran
    pipeline_run_id   VARCHAR(100),        -- unique run identifier
    table_name        VARCHAR(100),        -- which table was copied
    schema_name       VARCHAR(100),        -- source schema
    rows_read         INT,                 -- rows read from source
    rows_copied       INT,                 -- rows written to sink
    rows_skipped      INT DEFAULT 0,       -- rows that failed validation
    copy_duration     INT,                 -- seconds taken
    data_read_mb      DECIMAL(10,2),       -- data volume read (MB)
    data_written_mb   DECIMAL(10,2),       -- data volume written (MB)
    status            VARCHAR(20),         -- 'Success' or 'Failed'
    error_message     VARCHAR(MAX),        -- error details (NULL if success)
    source_type       VARCHAR(50),         -- 'AzureSQL', 'Oracle', 'API'
    sink_type         VARCHAR(50),         -- 'ADLS_Parquet', 'Delta', 'SQL'
    load_type         VARCHAR(20),         -- 'Full' or 'Incremental'
    watermark_value   VARCHAR(100),        -- last watermark for incremental
    load_date         DATETIME DEFAULT GETDATE(),
    
    INDEX IX_audit_table_date NONCLUSTERED (table_name, load_date DESC),
    INDEX IX_audit_status     NONCLUSTERED (status, load_date DESC),
    INDEX IX_audit_run_id     NONCLUSTERED (pipeline_run_id)
);

Why the extra columns matter:

Column Why You Need It Example Query
pipeline_run_id Links every audit row to a specific ADF run — cross-reference with Monitor tab WHERE pipeline_run_id = 'abc-123-def'
data_read_mb / data_written_mb Track data volume trends — catch sudden spikes or drops WHERE data_read_mb > 1000 (flag large loads)
load_type Distinguish full vs incremental — different expectations for row counts WHERE load_type = 'Full' AND rows_copied = 0
watermark_value Debug incremental loads — “what was the watermark for this run?” WHERE table_name = 'Orders' ORDER BY load_date DESC
status Simple filter instead of checking error_message for NULL or ‘NA’ WHERE status = 'Failed' AND load_date >= DATEADD(day, -7, GETDATE())

Start simple, grow later. The basic 8-column table from above is perfectly fine for your first pipeline. Add columns as your needs grow. The enhanced table is what production systems evolve into over time.

The Stored Procedure

CREATE PROCEDURE sp_insert_audit_log
    @rows_read INT, @rows_copied INT, @copy_duration INT,
    @table_name VARCHAR(100), @schema_name VARCHAR(100),
    @error_message VARCHAR(500)
AS
BEGIN
    INSERT INTO audit (rows_read, rows_copied, copy_duration,
                       table_name, schema_name, error_message, load_date)
    VALUES (@rows_read, @rows_copied, @copy_duration,
            @table_name, @schema_name, @error_message, GETDATE());
END;

Why a stored procedure? Encapsulation (change once, applies everywhere), security (EXECUTE vs INSERT permission), and reusability (multiple pipelines, same procedure).

Implementing in ADF/Synapse

Inside ForEach, add two Stored Procedure activities after Copy:

Copy_TableData
    |-- (Success green arrow) --> Log_Success
    |-- (Failure red arrow)   --> Log_Failure

Log_Success Parameters

Parameter Type Value
rows_read Int32 @activity('Copy_TableData').output.rowsRead
rows_copied Int32 @activity('Copy_TableData').output.rowsCopied
copy_duration Int32 @activity('Copy_TableData').output.copyDuration
table_name String @item().TableName
schema_name String @item().SchemaName
error_message String NA

Log_Failure Parameters

Parameter Type Value
rows_read Int32 0
rows_copied Int32 0
copy_duration Int32 0
table_name String @item().TableName
schema_name String @item().SchemaName
error_message String @concat('Failed: ', item().SchemaName, '.', item().TableName)

Note: Inside @concat(), don’t use @ on nested functions. It’s item().SchemaName, not @item().SchemaName.

Dependency Conditions

Condition Arrow When Child Runs
Succeeded Green Only on success
Failed Red Only on failure
Completed Blue Always
Skipped Gray Only when skipped

The @activity() Output Properties

@activity('Copy').output.rowsRead
@activity('Copy').output.rowsCopied
@activity('Copy').output.copyDuration
@activity('Copy').output.throughput
@activity('Copy').output.dataRead
@activity('Copy').output.dataWritten
@activity('Copy').output.errors[0].Message

What each property tells you:

Property Type What It Means Example Value
rowsRead Int Total rows read from source 84,752
rowsCopied Int Total rows written to sink 84,752
copyDuration Int Duration in seconds 47
throughput Float Speed in KBps 2,048.5
dataRead Long Bytes read from source 12,582,912
dataWritten Long Bytes written to sink (after compression) 3,145,728
errors[0].Message String First error message (if any) “Invalid column name ‘Addr'”

Data volume trick: Convert bytes to MB in your stored procedure for readable audit records:

data_read_mb:    @div(activity('Copy_TableData').output.dataRead, 1048576)
data_written_mb: @div(activity('Copy_TableData').output.dataWritten, 1048576)

Useful pipeline-level expressions for audit:

@pipeline().RunId                  ← unique ID for this pipeline run
@pipeline().Pipeline               ← pipeline name (e.g., "PL_Copy_SqlToADLS")
@pipeline().TriggerName            ← which trigger started this run
@pipeline().TriggerTime            ← when the trigger fired
@utcnow()                          ← current UTC timestamp
@pipeline().parameters.ParamName   ← any pipeline parameter value

Adding Pipeline Run ID

ALTER TABLE audit ADD pipeline_run_id VARCHAR(100);

Pass from pipeline: @pipeline().RunId

Links every audit row to a specific pipeline run.

Building an Audit Dashboard

The audit table becomes powerful when you build queries and dashboards on top of it. Here are production-ready queries:

Daily Load Summary

-- How did last night's load go?
SELECT 
    CAST(load_date AS DATE) AS run_date,
    COUNT(*) AS total_tables,
    SUM(CASE WHEN status = 'Success' THEN 1 ELSE 0 END) AS succeeded,
    SUM(CASE WHEN status = 'Failed' THEN 1 ELSE 0 END) AS failed,
    SUM(rows_copied) AS total_rows,
    MAX(load_date) AS last_completed,
    SUM(copy_duration) AS total_seconds
FROM audit
WHERE CAST(load_date AS DATE) = CAST(GETDATE() AS DATE)
GROUP BY CAST(load_date AS DATE);

Missing Tables Detection

-- Which tables did NOT run today? (compares against metadata)
SELECT m.TableName
FROM dbo.metadata m
LEFT JOIN audit a 
    ON m.TableName = a.table_name 
    AND CAST(a.load_date AS DATE) = CAST(GETDATE() AS DATE)
WHERE a.table_name IS NULL;

Row Count Anomaly Detection

-- Flag tables where today's row count is <50% or >200% of the 7-day average
WITH avg_rows AS (
    SELECT table_name, AVG(CAST(rows_copied AS FLOAT)) AS avg_7day
    FROM audit
    WHERE status = 'Success'
      AND load_date >= DATEADD(day, -7, GETDATE())
    GROUP BY table_name
),
today AS (
    SELECT table_name, rows_copied
    FROM audit
    WHERE status = 'Success'
      AND CAST(load_date AS DATE) = CAST(GETDATE() AS DATE)
)
SELECT t.table_name, t.rows_copied AS today_rows, 
       CAST(a.avg_7day AS INT) AS avg_7day_rows,
       CASE 
           WHEN t.rows_copied < a.avg_7day * 0.5 THEN 'ANOMALY: Too few rows'
           WHEN t.rows_copied > a.avg_7day * 2.0 THEN 'ANOMALY: Too many rows'
           ELSE 'Normal'
       END AS alert
FROM today t
JOIN avg_rows a ON t.table_name = a.table_name
WHERE t.rows_copied < a.avg_7day * 0.5 
   OR t.rows_copied > a.avg_7day * 2.0;

Performance Degradation Alert

-- Tables that took 3x longer than their average
SELECT table_name, copy_duration AS today_seconds,
       (SELECT AVG(copy_duration) FROM audit a2 
        WHERE a2.table_name = a.table_name 
          AND a2.load_date >= DATEADD(day, -30, GETDATE())
          AND a2.status = 'Success') AS avg_30day_seconds
FROM audit a
WHERE CAST(load_date AS DATE) = CAST(GETDATE() AS DATE)
  AND status = 'Success'
  AND copy_duration > 3 * (
      SELECT AVG(copy_duration) FROM audit a2 
      WHERE a2.table_name = a.table_name 
        AND a2.load_date >= DATEADD(day, -30, GETDATE())
        AND a2.status = 'Success'
  );

Dashboard options: Connect Power BI directly to the audit table for a visual monitoring dashboard. Or schedule these queries as SQL Agent jobs that send email alerts when anomalies are detected.

Audit Logging in Databricks and Fabric

Audit logging is not just an ADF concept — every platform needs it. Here is how the pattern translates:

Databricks (Delta Table Audit)

import datetime

def log_audit(table_name, rows_read, rows_written, status, error_msg=None):
    audit_record = [{
        "pipeline_name": "bronze_to_silver",
        "table_name": table_name,
        "rows_read": rows_read,
        "rows_written": rows_written,
        "status": status,
        "error_message": error_msg or "NA",
        "load_date": datetime.datetime.utcnow().isoformat()
    }]
    df = spark.createDataFrame(audit_record)
    df.write.mode("append").format("delta").saveAsTable("audit.pipeline_log")

# Usage in a notebook
try:
    df = spark.read.parquet("abfss://raw@lake.dfs.core.windows.net/customers/")
    cleaned = df.filter(df.CustomerID.isNotNull()).dropDuplicates()
    cleaned.write.mode("overwrite").format("delta").saveAsTable("silver.customers")
    log_audit("customers", df.count(), cleaned.count(), "Success")
except Exception as e:
    log_audit("customers", 0, 0, "Failed", str(e))
    raise  # re-raise so the job fails visibly

Microsoft Fabric (Notebook Audit)

# Same pattern — Fabric notebooks use the same PySpark API
# Write audit to a Lakehouse Delta table instead of SQL

audit_df = spark.createDataFrame([{
    "table_name": "customers",
    "rows_written": cleaned.count(),
    "status": "Success",
    "load_date": datetime.datetime.utcnow().isoformat()
}])
audit_df.write.mode("append").format("delta").saveAsTable("audit_log")

# Query audit from SQL analytics endpoint:
# SELECT * FROM audit_log WHERE status = 'Failed' ORDER BY load_date DESC

The pattern is identical across all platforms: wrap your transformation in try/except (or Success/Failure dependency), capture row counts and timing, write to a persistent audit store. The only difference is the destination — SQL table (ADF/Synapse), Delta table (Databricks/Fabric).

Common Mistakes

  1. Completed instead of Succeeded/Failed — both handlers run every time
  2. Missing failure handler — Copy failure stops entire ForEach
  3. Nested @ in expressions@concat(@item()...) is wrong
  4. Wrong parameter types — rowsRead needs Int32, not String
  5. Not testing failure path — insert a fake table to verify error logging
  6. Not indexing the audit table — as the audit table grows to millions of rows (50 tables × 365 days = 18K+ rows/year), queries without indexes become slow. Add indexes on (table_name, load_date) and (status, load_date) at minimum.

  7. Logging only failures — if you only log errors, you cannot answer “how many rows were copied yesterday” or “what was the average duration last month.” Always log both success AND failure for complete visibility.

  8. Not including pipeline_run_id — without it, you cannot distinguish between a scheduled run and a manual rerun on the same day. Always include @pipeline().RunId to tie audit records to specific ADF runs.

Interview Questions

Q: How do you implement audit logging in ADF? A: Stored Procedure activities after Copy — green arrow for success (logs metrics), red arrow for failure (logs errors). Both call the same stored procedure with different parameters.

Q: What’s the difference between green, red, and blue arrows? A: Green = Succeeded only. Red = Failed only. Blue = Completed always. Use green for success logging, red for failure logging.

Q: Why stored procedure instead of direct SQL? A: Encapsulation, security (EXECUTE permission), reusability, and simpler configuration.

Q: How do you detect data anomalies using audit logs? A: Compare today’s row count against the 7-day or 30-day average for each table. If today’s count is less than 50% or greater than 200% of the average, flag it as an anomaly. Similarly, compare copy duration to detect performance degradation. These queries can run as scheduled alerts.

Q: How do you handle audit logging in Databricks? A: Wrap transformations in try/except. On success, write a record to a Delta audit table with row counts, duration, and status. On failure, write the error message. Query the audit table using SQL or Delta time travel. Same pattern as ADF stored procedures, but using PySpark and Delta Lake instead of SQL Server.

Q: Why log both rows_read and rows_copied separately? A: They should match for most loads. If rows_read is 10,000 but rows_copied is 9,500, it means 500 rows failed during transfer — possibly due to data type mismatches, null constraint violations, or sink schema issues. The discrepancy itself is a data quality signal.

Q: How do you prevent the audit table from growing too large? A: Implement a retention policy. Keep detailed records for 90 days, aggregate older data into a summary table (daily totals by table), and delete or archive detail rows older than 90 days. Use a SQL Agent job or ADF pipeline to automate this cleanup weekly.

Wrapping Up

Audit logging transforms your pipeline from a black box into a transparent system. Start simple — table name, rows copied, duration, errors. Grow from there.

Related posts:Synapse Pipeline with Audit Logging (full implementation)Metadata-Driven Pipeline in ADFCommon ADF/Synapse Errors


Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link