Data Quality in Azure Databricks: Validation Rules, Quarantine Patterns, and Building a DQ Framework from Scratch

Data Quality in Azure Databricks: Validation Rules, Quarantine Patterns, and Building a DQ Framework from Scratch

Every pipeline you build has an implicit promise: “The data I produce is correct.” But is it? Are there NULLs where there should not be? Duplicate customer IDs? Negative order amounts? Emails without @ signs? Dates from the year 1900?

Bad data is like a termite infestation — invisible until the damage is done. An analyst builds a dashboard showing $2 million in revenue. The CFO presents it to the board. Then someone discovers 500 test orders were included because the pipeline did not filter them. Trust destroyed. Careers affected. All because nobody checked the data.

Data quality is not optional. It is the difference between a data platform that people trust and one they avoid.

This post builds a practical data quality framework in PySpark — not a theoretical list of principles, but actual code you can run in your Databricks notebooks to validate, quarantine, report, and monitor data quality.

Think of data quality like airport security. Every passenger (row) goes through a checkpoint. Valid passport? (not NULL). Boarding pass matches name? (referential integrity). No prohibited items? (business rule validation). Passengers who pass go to the gate (Silver/Gold). Passengers who fail go to secondary screening (quarantine). Nothing gets through unchecked.

Table of Contents

  • The Six Dimensions of Data Quality
  • Why Data Quality Matters (The Cost of Bad Data)
  • The Quarantine Pattern
  • Building the DQ Framework
  • DQ Check 1: Null Checks
  • DQ Check 2: Duplicate Checks
  • DQ Check 3: Data Type Validation
  • DQ Check 4: Range and Boundary Checks
  • DQ Check 5: Format Validation (Regex)
  • DQ Check 6: Referential Integrity
  • DQ Check 7: Freshness Checks
  • DQ Check 8: Row Count Validation
  • The Complete DQ Framework Class
  • DQ Report Generation
  • Integrating DQ into the Medallion Pipeline
  • DQ in Bronze → Silver (Validation Gate)
  • DQ in Gold (Business Rule Validation)
  • Automated Alerts on DQ Failures
  • Great Expectations vs Custom Framework
  • Common Data Quality Issues in Real Projects
  • Interview Questions
  • Wrapping Up

The Six Dimensions of Data Quality

Dimension Question It Answers Example Check
Completeness Are all required fields filled? customer_id is NOT NULL
Uniqueness Are records unique where they should be? No duplicate customer_ids
Validity Do values conform to expected formats? Email matches regex pattern
Accuracy Are values factually correct? Country code exists in reference table
Consistency Do related values agree? State matches country
Timeliness Is the data fresh enough? Last update within 24 hours

Real-life analogy: Imagine hiring a new employee. Completeness = resume has all sections filled. Uniqueness = no duplicate employee IDs. Validity = phone number has correct format. Accuracy = listed degree actually exists. Consistency = listed city matches listed state. Timeliness = resume updated this year, not 2015.

Why Data Quality Matters (The Cost of Bad Data)

Bad data is not just a technical problem — it is a business problem with real financial consequences:

Scenario 1: Duplicate customer records
  → Marketing sends 2 emails to every customer → complaints → unsubscribes → lost revenue

Scenario 2: NULL values in order amounts
  → Revenue dashboard shows $1.2M instead of $1.8M → CFO makes wrong budget decision

Scenario 3: Test orders in production data
  → "500 orders from customer Test_User" inflates KPIs → board gets wrong numbers

Scenario 4: Stale data (pipeline failed silently)
  → Dashboard shows last week's data → executives make decisions on outdated numbers

Estimated cost of bad data:
  • 15-25% of revenue is affected by poor data quality (Gartner)
  • Data scientists spend 60-80% of their time cleaning data instead of analyzing it
  • One bad data incident can take weeks to investigate and fix

The cheapest time to catch bad data is at ingestion. The most expensive time is after the CFO presents it to the board. Every DQ check you add is insurance against these scenarios.

The Quarantine Pattern

Instead of dropping bad records silently, route them to a quarantine table for investigation:

Source Data (100 rows)
      |
  DQ Validation
    /       Good (95)   Bad (5)
   |            |
Silver      Quarantine
table       table

Why quarantine instead of dropping? – Bad records are evidence of source system issues — you need to investigate – Dropping silently means you never know data is missing – Quarantine lets you fix and reprocess later – Audit compliance requires accounting for every record

Real-life analogy: Airport security does not throw away your bag if it has a suspicious item. They put it in secondary screening (quarantine), investigate, and either clear it or flag it. The bag is accounted for either way.

Building the DQ Framework

The Sample Data (Intentionally Messy)

from pyspark.sql.functions import *
from pyspark.sql.types import *

data = [
    (1001, "Naveen", "naveen@email.com", "Toronto", "Canada", 95000, "2023-01-15"),
    (1002, "Shrey", "shrey@email.com", "Toronto", "Canada", 88000, "2023-03-20"),
    (1003, None, "carol@email", "London", "UK", -5000, "2023-06-10"),         # NULL name, invalid email, negative salary
    (1004, "Vishnu", None, "12345", "Canada", 91000, "2023-09-01"),            # NULL email, numeric city
    (1005, "Ravi", "ravi@email.com", "Calgary", None, 85000, "1900-01-01"),   # NULL country, suspicious date
    (1001, "Naveen", "naveen@email.com", "Toronto", "Canada", 95000, "2023-01-15"),  # DUPLICATE
    (1006, "Priya", "priya@email.com", "Mumbai", "India", 78000, "2024-03-15"),
    (1007, "", "not-an-email", "", "India", 0, "2026-05-18"),                  # Empty strings, invalid email, zero salary
]

schema = StructType([
    StructField("customer_id", IntegerType()),
    StructField("name", StringType()),
    StructField("email", StringType()),
    StructField("city", StringType()),
    StructField("country", StringType()),
    StructField("salary", IntegerType()),
    StructField("hire_date", StringType()),
])

df = spark.createDataFrame(data, schema).withColumn("hire_date", to_date("hire_date"))
df.show(truncate=False)

DQ Check 1: Null Checks

def check_nulls(df, columns, table_name="table"):
    '''Check for NULL values in specified columns'''
    results = []
    total = df.count()

    for c in columns:
        null_count = df.filter(col(c).isNull() | (col(c) == "")).count()
        pct = round(null_count / total * 100, 2) if total > 0 else 0
        status = "FAIL" if null_count > 0 else "PASS"
        results.append({
            "table": table_name,
            "check": "null_check",
            "column": c,
            "total_rows": total,
            "failed_rows": null_count,
            "fail_pct": pct,
            "status": status
        })
        icon = "FAIL" if status == "FAIL" else "PASS"
        print(f"  [{icon}] {c}: {null_count} nulls ({pct}%)")

    return results

print("=== NULL CHECKS ===")
null_results = check_nulls(df, ["customer_id", "name", "email", "city", "country"], "customers")

DQ Check 2: Duplicate Checks

def check_duplicates(df, key_columns, table_name="table"):
    '''Check for duplicate rows based on key columns'''
    total = df.count()
    distinct = df.dropDuplicates(key_columns).count()
    dupes = total - distinct
    pct = round(dupes / total * 100, 2) if total > 0 else 0
    status = "FAIL" if dupes > 0 else "PASS"

    print(f"  [{status}] Duplicates on {key_columns}: {dupes} ({pct}%)")

    if dupes > 0:
        print("  Duplicate records:")
        df.groupBy(key_columns).count().filter(col("count") > 1).show()

    return [{"table": table_name, "check": "duplicate_check",
             "column": str(key_columns), "total_rows": total,
             "failed_rows": dupes, "fail_pct": pct, "status": status}]

print("=== DUPLICATE CHECKS ===")
dup_results = check_duplicates(df, ["customer_id"], "customers")

DQ Check 3: Data Type Validation

def check_numeric_city(df, column="city", table_name="table"):
    '''Check for numeric values in city column (should be text)'''
    bad = df.filter(col(column).rlike("^[0-9]+$")).count()
    total = df.count()
    pct = round(bad / total * 100, 2)
    status = "FAIL" if bad > 0 else "PASS"
    print(f"  [{status}] Numeric values in {column}: {bad} ({pct}%)")
    return [{"table": table_name, "check": "type_check",
             "column": column, "total_rows": total,
             "failed_rows": bad, "fail_pct": pct, "status": status}]

print("=== TYPE CHECKS ===")
type_results = check_numeric_city(df, "city", "customers")

DQ Check 4: Range and Boundary Checks

def check_range(df, column, min_val, max_val, table_name="table"):
    '''Check if values fall within expected range'''
    bad = df.filter(
        (col(column) < min_val) | (col(column) > max_val) | col(column).isNull()
    ).count()
    total = df.count()
    pct = round(bad / total * 100, 2)
    status = "FAIL" if bad > 0 else "PASS"
    print(f"  [{status}] {column} out of range [{min_val}-{max_val}]: {bad} ({pct}%)")
    return [{"table": table_name, "check": "range_check",
             "column": column, "total_rows": total,
             "failed_rows": bad, "fail_pct": pct, "status": status}]

print("=== RANGE CHECKS ===")
range_results = check_range(df, "salary", 1, 500000, "customers")

DQ Check 5: Format Validation (Regex)

def check_email_format(df, column="email", table_name="table"):
    '''Validate email format using regex'''
    email_regex = "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    bad = df.filter(
        col(column).isNotNull() &
        (col(column) != "") &
        ~col(column).rlike(email_regex)
    ).count()
    total = df.filter(col(column).isNotNull() & (col(column) != "")).count()
    pct = round(bad / total * 100, 2) if total > 0 else 0
    status = "FAIL" if bad > 0 else "PASS"
    print(f"  [{status}] Invalid email format: {bad} of {total} ({pct}%)")
    return [{"table": table_name, "check": "format_check",
             "column": column, "total_rows": total,
             "failed_rows": bad, "fail_pct": pct, "status": status}]

print("=== FORMAT CHECKS ===")
format_results = check_email_format(df, "email", "customers")

DQ Check 6: Referential Integrity

def check_referential_integrity(df, column, ref_df, ref_column, table_name="table"):
    '''Check if values exist in a reference table'''
    orphans = df.join(ref_df, df[column] == ref_df[ref_column], "left_anti")
    orphan_count = orphans.count()
    total = df.count()
    pct = round(orphan_count / total * 100, 2)
    status = "FAIL" if orphan_count > 0 else "PASS"
    print(f"  [{status}] Orphan records (no match in reference): {orphan_count} ({pct}%)")
    return [{"table": table_name, "check": "referential_integrity",
             "column": column, "total_rows": total,
             "failed_rows": orphan_count, "fail_pct": pct, "status": status}]

# Reference table of valid countries
valid_countries = spark.createDataFrame(
    [("Canada",), ("India",), ("UK",), ("USA",)], ["country_name"]
)

print("=== REFERENTIAL INTEGRITY ===")
ref_results = check_referential_integrity(df.filter(col("country").isNotNull()),
    "country", valid_countries, "country_name", "customers")

DQ Check 7: Freshness Checks

def check_freshness(df, date_column, max_age_days=365, table_name="table"):
    '''Check if date values are within acceptable freshness'''
    stale = df.filter(
        col(date_column).isNotNull() &
        (datediff(current_date(), col(date_column)) > max_age_days)
    ).count()
    total = df.filter(col(date_column).isNotNull()).count()
    pct = round(stale / total * 100, 2) if total > 0 else 0
    status = "FAIL" if stale > 0 else "PASS"
    print(f"  [{status}] Records older than {max_age_days} days: {stale} ({pct}%)")
    return [{"table": table_name, "check": "freshness_check",
             "column": date_column, "total_rows": total,
             "failed_rows": stale, "fail_pct": pct, "status": status}]

print("=== FRESHNESS CHECKS ===")
fresh_results = check_freshness(df, "hire_date", 1095, "customers")  # 3 years

DQ Check 8: Row Count Validation

def check_row_count(df, expected_min, expected_max=None, table_name="table"):
    '''Validate row count is within expected range'''
    actual = df.count()
    if expected_max:
        status = "PASS" if expected_min <= actual <= expected_max else "FAIL"
        print(f"  [{status}] Row count: {actual} (expected {expected_min}-{expected_max})")
    else:
        status = "PASS" if actual >= expected_min else "FAIL"
        print(f"  [{status}] Row count: {actual} (expected >= {expected_min})")
    return [{"table": table_name, "check": "row_count",
             "column": "N/A", "total_rows": actual,
             "failed_rows": 0 if status == "PASS" else actual,
             "fail_pct": 0, "status": status}]

print("=== ROW COUNT CHECKS ===")
count_results = check_row_count(df, 5, 1000, "customers")

The Complete DQ Framework Class

class DataQualityFramework:
    '''Reusable data quality validation framework for PySpark DataFrames'''

    def __init__(self, df, table_name):
        self.df = df
        self.table_name = table_name
        self.results = []
        self.total_rows = df.count()

    def check_nulls(self, columns):
        for c in columns:
            null_count = self.df.filter(col(c).isNull() | (col(c) == "")).count()
            pct = round(null_count / self.total_rows * 100, 2)
            self.results.append({
                "check": "null", "column": c,
                "failed": null_count, "pct": pct,
                "status": "FAIL" if null_count > 0 else "PASS"
            })
        return self

    def check_duplicates(self, key_columns):
        distinct = self.df.dropDuplicates(key_columns).count()
        dupes = self.total_rows - distinct
        self.results.append({
            "check": "duplicate", "column": str(key_columns),
            "failed": dupes, "pct": round(dupes / self.total_rows * 100, 2),
            "status": "FAIL" if dupes > 0 else "PASS"
        })
        return self

    def check_range(self, column, min_val, max_val):
        bad = self.df.filter((col(column) < min_val) | (col(column) > max_val)).count()
        self.results.append({
            "check": "range", "column": column,
            "failed": bad, "pct": round(bad / self.total_rows * 100, 2),
            "status": "FAIL" if bad > 0 else "PASS"
        })
        return self

    def check_regex(self, column, pattern, description="format"):
        bad = self.df.filter(col(column).isNotNull() & ~col(column).rlike(pattern)).count()
        self.results.append({
            "check": description, "column": column,
            "failed": bad, "pct": round(bad / self.total_rows * 100, 2),
            "status": "FAIL" if bad > 0 else "PASS"
        })
        return self

    def get_quarantine(self, conditions):
        '''Return rows that fail ANY of the conditions'''
        combined = conditions[0]
        for c in conditions[1:]:
            combined = combined | c
        return self.df.filter(combined)

    def report(self):
        '''Print DQ report'''
        print(f"
{'='*60}")
        print(f"DATA QUALITY REPORT: {self.table_name}")
        print(f"Total rows: {self.total_rows}")
        print(f"{'='*60}")

        passed = sum(1 for r in self.results if r["status"] == "PASS")
        failed = sum(1 for r in self.results if r["status"] == "FAIL")

        for r in self.results:
            icon = "PASS" if r["status"] == "PASS" else "FAIL"
            print(f"  [{icon}] {r['check']:12s} | {r['column']:20s} | "
                  f"Failed: {r['failed']:5d} ({r['pct']}%)")

        print(f"
Summary: {passed} passed, {failed} failed out of {len(self.results)} checks")
        overall = "PASS" if failed == 0 else "FAIL"
        print(f"Overall: [{overall}]")
        print(f"{'='*60}")

        return self.results

# Usage:
dq = DataQualityFramework(df, "customers")
results = (dq
    .check_nulls(["customer_id", "name", "email", "city", "country"])
    .check_duplicates(["customer_id"])
    .check_range("salary", 1, 500000)
    .check_regex("email", "^.+@.+\..+$", "email_format")
    .report()
)

Output:

============================================================
DATA QUALITY REPORT: customers
Total rows: 8
============================================================
  [PASS] null         | customer_id          | Failed:     0 (0.0%)
  [FAIL] null         | name                 | Failed:     1 (12.5%)
  [FAIL] null         | email                | Failed:     1 (12.5%)
  [FAIL] null         | city                 | Failed:     1 (12.5%)
  [FAIL] null         | country              | Failed:     1 (12.5%)
  [FAIL] duplicate    | ['customer_id']      | Failed:     1 (12.5%)
  [FAIL] range        | salary               | Failed:     2 (25.0%)
  [FAIL] email_format | email                | Failed:     2 (25.0%)

Summary: 1 passed, 7 failed out of 8 checks
Overall: [FAIL]
============================================================

DQ Report Generation

The report() method in the framework class prints a summary to the console, but in production you need a persistent, queryable DQ report stored as a Delta table:

from datetime import datetime

def save_dq_report(results, table_name, run_id=None):
    '''Save DQ results as a Delta table for historical tracking'''
    run_id = run_id or datetime.now().strftime("%Y%m%d_%H%M%S")
    report_rows = []
    for r in results:
        report_rows.append({
            "run_id": run_id,
            "run_timestamp": datetime.now().isoformat(),
            "table_name": table_name,
            "check_type": r["check"],
            "column_name": r["column"],
            "failed_rows": r["failed"],
            "fail_pct": r["pct"],
            "status": r["status"]
        })

    df_report = spark.createDataFrame(report_rows)
    df_report.write.format("delta").mode("append").saveAsTable("dq_monitoring.dq_results")
    print(f"DQ report saved: {len(report_rows)} checks for {table_name}")

# Usage — after running the framework:
dq = DataQualityFramework(df, "customers")
results = (dq
    .check_nulls(["customer_id", "name", "email"])
    .check_duplicates(["customer_id"])
    .check_range("salary", 1, 500000)
    .report()
)
save_dq_report(results, "customers")

# Query historical DQ trends:
# SELECT table_name, check_type, column_name, fail_pct, run_timestamp
# FROM dq_monitoring.dq_results
# WHERE table_name = 'customers'
# ORDER BY run_timestamp DESC

Over time, this table becomes your data quality dashboard — showing trends like “null rate in email column has been increasing for 3 weeks” before it becomes a crisis.

Integrating DQ into the Medallion Pipeline

DQ in Bronze → Silver (Validation Gate)

# Read Bronze
df_bronze = spark.read.parquet(".../bronze/customers/")

# Run DQ checks
dq = DataQualityFramework(df_bronze, "bronze_customers")
results = (dq
    .check_nulls(["customer_id"])
    .check_duplicates(["customer_id"])
    .check_regex("email", "^.+@.+\..+$")
    .report()
)

# Separate good and bad
quarantine_conditions = [
    col("customer_id").isNull(),
    col("email").isNotNull() & ~col("email").rlike("^.+@.+\..+$")
]

df_bad = dq.get_quarantine(quarantine_conditions)
df_good = df_bronze.exceptAll(df_bad)

# Write good to Silver, bad to Quarantine
df_good.write.format("delta").mode("overwrite").save(".../silver/customers/")
df_bad.write.format("delta").mode("append").save(".../quarantine/customers/")

print(f"Silver: {df_good.count()} rows | Quarantine: {df_bad.count()} rows")

DQ in Gold (Business Rule Validation)

Gold layer DQ checks are different from Bronze→Silver checks. Instead of structural issues (nulls, format), Gold checks validate business rules and aggregation integrity:

# Gold DQ: Business rule validation after building star schema
df_gold = spark.read.format("delta").load(".../gold/fact_orders/")
df_dim_customer = spark.read.format("delta").load(".../gold/dim_customer/")

dq_gold = DataQualityFramework(df_gold, "gold_fact_orders")
results = (dq_gold
    .check_nulls(["order_key", "customer_key", "date_key"])  # Surrogate keys must never be NULL
    .check_duplicates(["order_key"])                           # Fact keys must be unique
    .check_range("total_amount", 0, 1000000)                  # No negative or unrealistic amounts
    .report()
)

# Referential integrity: every customer_key in fact must exist in dim
orphans = df_gold.join(df_dim_customer, "customer_key", "left_anti")
if orphans.count() > 0:
    print(f"FAIL: {orphans.count()} orders reference non-existent customers")
    orphans.write.format("delta").mode("append").save(".../quarantine/orphan_orders/")

# Revenue sanity check: today's total should be within 50% of 30-day average
from pyspark.sql.functions import sum as spark_sum
today_rev = df_gold.filter(col("order_date") == current_date()).agg(spark_sum("total_amount")).collect()[0][0] or 0
avg_rev = df_gold.filter(col("order_date") >= date_sub(current_date(), 30)) \
    .groupBy("order_date").agg(spark_sum("total_amount").alias("daily_rev")) \
    .agg({"daily_rev": "avg"}).collect()[0][0] or 1

if today_rev < avg_rev * 0.5 or today_rev > avg_rev * 1.5:
    print(f"WARN: Today's revenue ${today_rev:,.0f} is outside 50% of 30-day avg ${avg_rev:,.0f}")

The difference: Bronze→Silver DQ catches data defects (broken rows). Gold DQ catches business anomalies (missing references, unrealistic totals, broken star schema integrity).

Automated Alerts on DQ Failures

DQ checks are useless if nobody sees the failures. Set up automated alerts so the team is notified immediately:

def alert_on_dq_failure(results, table_name):
    '''Check DQ results and raise alert if any critical checks failed'''
    failures = [r for r in results if r["status"] == "FAIL"]

    if not failures:
        print(f"All DQ checks passed for {table_name}")
        return

    # Build alert message
    alert_msg = f"DQ ALERT: {len(failures)} checks failed for {table_name}\n"
    for f in failures:
        alert_msg += f"  - {f['check']} on {f['column']}: {f['failed']} rows ({f['pct']}%)\n"

    print(alert_msg)

    # Option 1: Fail the notebook (pipeline catches it and sends Teams/Outlook alert)
    if any(f["check"] in ["null", "duplicate"] and f["column"] == "customer_id" for f in failures):
        dbutils.notebook.exit(f"CRITICAL DQ FAILURE: {alert_msg}")
        # The calling pipeline's red arrow path triggers a Teams notification

    # Option 2: Write to an alert table (Data Activator or Power BI monitors it)
    alert_df = spark.createDataFrame([{
        "timestamp": datetime.now().isoformat(),
        "table": table_name,
        "failures": len(failures),
        "message": alert_msg
    }])
    alert_df.write.format("delta").mode("append").saveAsTable("dq_monitoring.alerts")

# Usage in pipeline notebook:
results = dq.report()
alert_on_dq_failure(results, "customers")
Alert escalation levels:
  INFO:     Log to dq_results table (all checks)
  WARN:     Log + print warning (non-critical failures like invalid email)
  CRITICAL: Log + fail notebook + pipeline sends Teams alert (null primary key, broken referential integrity)

Pipeline pattern:
  Notebook runs DQ → passes? → continue pipeline
                   → fails?  → dbutils.notebook.exit("FAILED")
                              → pipeline red arrow → Teams notification
                              → on-call engineer investigates

Great Expectations vs Custom Framework

Great Expectations (GX) is the most popular open-source data quality library. How does it compare to the custom framework we built?

Feature Custom Framework (This Post) Great Expectations
Setup Zero — just PySpark functions Requires pip install + configuration + data context
Learning curve Low — standard PySpark Medium — GX-specific API and YAML configs
Check types 8 (built here, extend as needed) 300+ built-in expectations
Documentation Your code IS the documentation Auto-generated Data Docs (HTML reports)
Schema validation Manual Built-in schema expectations
Profiling Manual Auto-profiling generates expectations from data
Spark support Native PySpark Supported (with SparkDFExecutionEngine)
Maintenance You maintain it Community maintained, regular updates
Best for Small-medium teams, full control, learning Large teams, standardization, 300+ check types needed

The recommendation: Start with the custom framework from this post — it teaches you what DQ checks actually do under the hood. Once your team grows or you need 50+ check types, evaluate Great Expectations. Many production teams use a hybrid: custom PySpark checks for core validation and GX for advanced profiling and documentation.

Common Data Quality Issues in Real Projects

Issue How Often Impact Detection
NULL primary keys Weekly Broken joins, lost data Null check on key columns
Duplicate records Daily Inflated metrics, double-counting Duplicate check on business key
Invalid dates (1900-01-01) Monthly Wrong age calculations, broken time series Range check: date > 2000-01-01
Test/dummy data in production Quarterly Inflated revenue, wrong KPIs Filter: name NOT LIKE ‘Test%’
Encoding issues (mojibake) Monthly Garbled names, broken search Regex check for non-printable chars
Schema drift (new columns) Weekly Pipeline failures, missing data Schema comparison with expected
Referential integrity breaks Daily Orphan records, broken star schema Anti-join with reference tables
Negative amounts Weekly Wrong aggregations Range check: amount >= 0

Interview Questions

Q: What are the six dimensions of data quality? A: Completeness (no missing values), Uniqueness (no duplicates), Validity (correct format), Accuracy (factually correct), Consistency (related values agree), and Timeliness (data is fresh enough). Each dimension has specific checks: null checks for completeness, dedup for uniqueness, regex for validity.

Q: What is the quarantine pattern? A: Instead of dropping records that fail validation, route them to a separate quarantine table for investigation. This preserves evidence of source issues, enables reprocessing after fixes, and maintains a complete audit trail. Good records go to Silver, bad records go to Quarantine.

Q: How do you integrate data quality into a Medallion Architecture? A: DQ checks run at two points: Bronze→Silver (validate raw data, quarantine bad records) and Silver→Gold (validate business rules). Bronze→Silver catches structural issues (nulls, duplicates, format). Silver→Gold catches business issues (referential integrity, KPI accuracy).

Q: How do you handle data quality failures in production? A: Three levels: WARN (log the issue but continue processing), FAIL (stop the pipeline and alert), QUARANTINE (separate bad records but continue with good ones). Critical checks (null primary keys) should fail the pipeline. Non-critical checks (invalid email) should quarantine and continue.

Wrapping Up

Data quality is not a one-time activity — it is a continuous process built into every layer of your pipeline. Check at Bronze ingestion, validate at Silver transformation, verify at Gold aggregation. Quarantine bad records instead of dropping them. Report everything. Alert on failures.

The framework we built is simple but production-ready. Eight check types cover 90% of real-world data quality issues. The class-based approach makes it reusable across every table in your data lake. And the quarantine pattern ensures no record is silently lost.

Trust in your data platform starts with data quality. Build it in from day one.

Related posts:Project 4: Data Quality + SCDMedallion ArchitecturePySpark Transformations CookbookDelta Lake Deep Dive


Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link