Data Quality in Azure Databricks: Validation Rules, Quarantine Patterns, and Building a DQ Framework from Scratch
Every pipeline you build has an implicit promise: “The data I produce is correct.” But is it? Are there NULLs where there should not be? Duplicate customer IDs? Negative order amounts? Emails without @ signs? Dates from the year 1900?
Bad data is like a termite infestation — invisible until the damage is done. An analyst builds a dashboard showing $2 million in revenue. The CFO presents it to the board. Then someone discovers 500 test orders were included because the pipeline did not filter them. Trust destroyed. Careers affected. All because nobody checked the data.
Data quality is not optional. It is the difference between a data platform that people trust and one they avoid.
This post builds a practical data quality framework in PySpark — not a theoretical list of principles, but actual code you can run in your Databricks notebooks to validate, quarantine, report, and monitor data quality.
Think of data quality like airport security. Every passenger (row) goes through a checkpoint. Valid passport? (not NULL). Boarding pass matches name? (referential integrity). No prohibited items? (business rule validation). Passengers who pass go to the gate (Silver/Gold). Passengers who fail go to secondary screening (quarantine). Nothing gets through unchecked.
Table of Contents
- The Six Dimensions of Data Quality
- Why Data Quality Matters (The Cost of Bad Data)
- The Quarantine Pattern
- Building the DQ Framework
- DQ Check 1: Null Checks
- DQ Check 2: Duplicate Checks
- DQ Check 3: Data Type Validation
- DQ Check 4: Range and Boundary Checks
- DQ Check 5: Format Validation (Regex)
- DQ Check 6: Referential Integrity
- DQ Check 7: Freshness Checks
- DQ Check 8: Row Count Validation
- The Complete DQ Framework Class
- DQ Report Generation
- Integrating DQ into the Medallion Pipeline
- DQ in Bronze → Silver (Validation Gate)
- DQ in Gold (Business Rule Validation)
- Automated Alerts on DQ Failures
- Great Expectations vs Custom Framework
- Common Data Quality Issues in Real Projects
- Interview Questions
- Wrapping Up
The Six Dimensions of Data Quality
| Dimension | Question It Answers | Example Check |
|---|---|---|
| Completeness | Are all required fields filled? | customer_id is NOT NULL |
| Uniqueness | Are records unique where they should be? | No duplicate customer_ids |
| Validity | Do values conform to expected formats? | Email matches regex pattern |
| Accuracy | Are values factually correct? | Country code exists in reference table |
| Consistency | Do related values agree? | State matches country |
| Timeliness | Is the data fresh enough? | Last update within 24 hours |
Real-life analogy: Imagine hiring a new employee. Completeness = resume has all sections filled. Uniqueness = no duplicate employee IDs. Validity = phone number has correct format. Accuracy = listed degree actually exists. Consistency = listed city matches listed state. Timeliness = resume updated this year, not 2015.
Why Data Quality Matters (The Cost of Bad Data)
Bad data is not just a technical problem — it is a business problem with real financial consequences:
Scenario 1: Duplicate customer records
→ Marketing sends 2 emails to every customer → complaints → unsubscribes → lost revenue
Scenario 2: NULL values in order amounts
→ Revenue dashboard shows $1.2M instead of $1.8M → CFO makes wrong budget decision
Scenario 3: Test orders in production data
→ "500 orders from customer Test_User" inflates KPIs → board gets wrong numbers
Scenario 4: Stale data (pipeline failed silently)
→ Dashboard shows last week's data → executives make decisions on outdated numbers
Estimated cost of bad data:
• 15-25% of revenue is affected by poor data quality (Gartner)
• Data scientists spend 60-80% of their time cleaning data instead of analyzing it
• One bad data incident can take weeks to investigate and fix
The cheapest time to catch bad data is at ingestion. The most expensive time is after the CFO presents it to the board. Every DQ check you add is insurance against these scenarios.
The Quarantine Pattern
Instead of dropping bad records silently, route them to a quarantine table for investigation:
Source Data (100 rows)
|
DQ Validation
/ Good (95) Bad (5)
| |
Silver Quarantine
table table
Why quarantine instead of dropping? – Bad records are evidence of source system issues — you need to investigate – Dropping silently means you never know data is missing – Quarantine lets you fix and reprocess later – Audit compliance requires accounting for every record
Real-life analogy: Airport security does not throw away your bag if it has a suspicious item. They put it in secondary screening (quarantine), investigate, and either clear it or flag it. The bag is accounted for either way.
Building the DQ Framework
The Sample Data (Intentionally Messy)
from pyspark.sql.functions import *
from pyspark.sql.types import *
data = [
(1001, "Naveen", "naveen@email.com", "Toronto", "Canada", 95000, "2023-01-15"),
(1002, "Shrey", "shrey@email.com", "Toronto", "Canada", 88000, "2023-03-20"),
(1003, None, "carol@email", "London", "UK", -5000, "2023-06-10"), # NULL name, invalid email, negative salary
(1004, "Vishnu", None, "12345", "Canada", 91000, "2023-09-01"), # NULL email, numeric city
(1005, "Ravi", "ravi@email.com", "Calgary", None, 85000, "1900-01-01"), # NULL country, suspicious date
(1001, "Naveen", "naveen@email.com", "Toronto", "Canada", 95000, "2023-01-15"), # DUPLICATE
(1006, "Priya", "priya@email.com", "Mumbai", "India", 78000, "2024-03-15"),
(1007, "", "not-an-email", "", "India", 0, "2026-05-18"), # Empty strings, invalid email, zero salary
]
schema = StructType([
StructField("customer_id", IntegerType()),
StructField("name", StringType()),
StructField("email", StringType()),
StructField("city", StringType()),
StructField("country", StringType()),
StructField("salary", IntegerType()),
StructField("hire_date", StringType()),
])
df = spark.createDataFrame(data, schema).withColumn("hire_date", to_date("hire_date"))
df.show(truncate=False)
DQ Check 1: Null Checks
def check_nulls(df, columns, table_name="table"):
'''Check for NULL values in specified columns'''
results = []
total = df.count()
for c in columns:
null_count = df.filter(col(c).isNull() | (col(c) == "")).count()
pct = round(null_count / total * 100, 2) if total > 0 else 0
status = "FAIL" if null_count > 0 else "PASS"
results.append({
"table": table_name,
"check": "null_check",
"column": c,
"total_rows": total,
"failed_rows": null_count,
"fail_pct": pct,
"status": status
})
icon = "FAIL" if status == "FAIL" else "PASS"
print(f" [{icon}] {c}: {null_count} nulls ({pct}%)")
return results
print("=== NULL CHECKS ===")
null_results = check_nulls(df, ["customer_id", "name", "email", "city", "country"], "customers")
DQ Check 2: Duplicate Checks
def check_duplicates(df, key_columns, table_name="table"):
'''Check for duplicate rows based on key columns'''
total = df.count()
distinct = df.dropDuplicates(key_columns).count()
dupes = total - distinct
pct = round(dupes / total * 100, 2) if total > 0 else 0
status = "FAIL" if dupes > 0 else "PASS"
print(f" [{status}] Duplicates on {key_columns}: {dupes} ({pct}%)")
if dupes > 0:
print(" Duplicate records:")
df.groupBy(key_columns).count().filter(col("count") > 1).show()
return [{"table": table_name, "check": "duplicate_check",
"column": str(key_columns), "total_rows": total,
"failed_rows": dupes, "fail_pct": pct, "status": status}]
print("=== DUPLICATE CHECKS ===")
dup_results = check_duplicates(df, ["customer_id"], "customers")
DQ Check 3: Data Type Validation
def check_numeric_city(df, column="city", table_name="table"):
'''Check for numeric values in city column (should be text)'''
bad = df.filter(col(column).rlike("^[0-9]+$")).count()
total = df.count()
pct = round(bad / total * 100, 2)
status = "FAIL" if bad > 0 else "PASS"
print(f" [{status}] Numeric values in {column}: {bad} ({pct}%)")
return [{"table": table_name, "check": "type_check",
"column": column, "total_rows": total,
"failed_rows": bad, "fail_pct": pct, "status": status}]
print("=== TYPE CHECKS ===")
type_results = check_numeric_city(df, "city", "customers")
DQ Check 4: Range and Boundary Checks
def check_range(df, column, min_val, max_val, table_name="table"):
'''Check if values fall within expected range'''
bad = df.filter(
(col(column) < min_val) | (col(column) > max_val) | col(column).isNull()
).count()
total = df.count()
pct = round(bad / total * 100, 2)
status = "FAIL" if bad > 0 else "PASS"
print(f" [{status}] {column} out of range [{min_val}-{max_val}]: {bad} ({pct}%)")
return [{"table": table_name, "check": "range_check",
"column": column, "total_rows": total,
"failed_rows": bad, "fail_pct": pct, "status": status}]
print("=== RANGE CHECKS ===")
range_results = check_range(df, "salary", 1, 500000, "customers")
DQ Check 5: Format Validation (Regex)
def check_email_format(df, column="email", table_name="table"):
'''Validate email format using regex'''
email_regex = "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
bad = df.filter(
col(column).isNotNull() &
(col(column) != "") &
~col(column).rlike(email_regex)
).count()
total = df.filter(col(column).isNotNull() & (col(column) != "")).count()
pct = round(bad / total * 100, 2) if total > 0 else 0
status = "FAIL" if bad > 0 else "PASS"
print(f" [{status}] Invalid email format: {bad} of {total} ({pct}%)")
return [{"table": table_name, "check": "format_check",
"column": column, "total_rows": total,
"failed_rows": bad, "fail_pct": pct, "status": status}]
print("=== FORMAT CHECKS ===")
format_results = check_email_format(df, "email", "customers")
DQ Check 6: Referential Integrity
def check_referential_integrity(df, column, ref_df, ref_column, table_name="table"):
'''Check if values exist in a reference table'''
orphans = df.join(ref_df, df[column] == ref_df[ref_column], "left_anti")
orphan_count = orphans.count()
total = df.count()
pct = round(orphan_count / total * 100, 2)
status = "FAIL" if orphan_count > 0 else "PASS"
print(f" [{status}] Orphan records (no match in reference): {orphan_count} ({pct}%)")
return [{"table": table_name, "check": "referential_integrity",
"column": column, "total_rows": total,
"failed_rows": orphan_count, "fail_pct": pct, "status": status}]
# Reference table of valid countries
valid_countries = spark.createDataFrame(
[("Canada",), ("India",), ("UK",), ("USA",)], ["country_name"]
)
print("=== REFERENTIAL INTEGRITY ===")
ref_results = check_referential_integrity(df.filter(col("country").isNotNull()),
"country", valid_countries, "country_name", "customers")
DQ Check 7: Freshness Checks
def check_freshness(df, date_column, max_age_days=365, table_name="table"):
'''Check if date values are within acceptable freshness'''
stale = df.filter(
col(date_column).isNotNull() &
(datediff(current_date(), col(date_column)) > max_age_days)
).count()
total = df.filter(col(date_column).isNotNull()).count()
pct = round(stale / total * 100, 2) if total > 0 else 0
status = "FAIL" if stale > 0 else "PASS"
print(f" [{status}] Records older than {max_age_days} days: {stale} ({pct}%)")
return [{"table": table_name, "check": "freshness_check",
"column": date_column, "total_rows": total,
"failed_rows": stale, "fail_pct": pct, "status": status}]
print("=== FRESHNESS CHECKS ===")
fresh_results = check_freshness(df, "hire_date", 1095, "customers") # 3 years
DQ Check 8: Row Count Validation
def check_row_count(df, expected_min, expected_max=None, table_name="table"):
'''Validate row count is within expected range'''
actual = df.count()
if expected_max:
status = "PASS" if expected_min <= actual <= expected_max else "FAIL"
print(f" [{status}] Row count: {actual} (expected {expected_min}-{expected_max})")
else:
status = "PASS" if actual >= expected_min else "FAIL"
print(f" [{status}] Row count: {actual} (expected >= {expected_min})")
return [{"table": table_name, "check": "row_count",
"column": "N/A", "total_rows": actual,
"failed_rows": 0 if status == "PASS" else actual,
"fail_pct": 0, "status": status}]
print("=== ROW COUNT CHECKS ===")
count_results = check_row_count(df, 5, 1000, "customers")
The Complete DQ Framework Class
class DataQualityFramework:
'''Reusable data quality validation framework for PySpark DataFrames'''
def __init__(self, df, table_name):
self.df = df
self.table_name = table_name
self.results = []
self.total_rows = df.count()
def check_nulls(self, columns):
for c in columns:
null_count = self.df.filter(col(c).isNull() | (col(c) == "")).count()
pct = round(null_count / self.total_rows * 100, 2)
self.results.append({
"check": "null", "column": c,
"failed": null_count, "pct": pct,
"status": "FAIL" if null_count > 0 else "PASS"
})
return self
def check_duplicates(self, key_columns):
distinct = self.df.dropDuplicates(key_columns).count()
dupes = self.total_rows - distinct
self.results.append({
"check": "duplicate", "column": str(key_columns),
"failed": dupes, "pct": round(dupes / self.total_rows * 100, 2),
"status": "FAIL" if dupes > 0 else "PASS"
})
return self
def check_range(self, column, min_val, max_val):
bad = self.df.filter((col(column) < min_val) | (col(column) > max_val)).count()
self.results.append({
"check": "range", "column": column,
"failed": bad, "pct": round(bad / self.total_rows * 100, 2),
"status": "FAIL" if bad > 0 else "PASS"
})
return self
def check_regex(self, column, pattern, description="format"):
bad = self.df.filter(col(column).isNotNull() & ~col(column).rlike(pattern)).count()
self.results.append({
"check": description, "column": column,
"failed": bad, "pct": round(bad / self.total_rows * 100, 2),
"status": "FAIL" if bad > 0 else "PASS"
})
return self
def get_quarantine(self, conditions):
'''Return rows that fail ANY of the conditions'''
combined = conditions[0]
for c in conditions[1:]:
combined = combined | c
return self.df.filter(combined)
def report(self):
'''Print DQ report'''
print(f"
{'='*60}")
print(f"DATA QUALITY REPORT: {self.table_name}")
print(f"Total rows: {self.total_rows}")
print(f"{'='*60}")
passed = sum(1 for r in self.results if r["status"] == "PASS")
failed = sum(1 for r in self.results if r["status"] == "FAIL")
for r in self.results:
icon = "PASS" if r["status"] == "PASS" else "FAIL"
print(f" [{icon}] {r['check']:12s} | {r['column']:20s} | "
f"Failed: {r['failed']:5d} ({r['pct']}%)")
print(f"
Summary: {passed} passed, {failed} failed out of {len(self.results)} checks")
overall = "PASS" if failed == 0 else "FAIL"
print(f"Overall: [{overall}]")
print(f"{'='*60}")
return self.results
# Usage:
dq = DataQualityFramework(df, "customers")
results = (dq
.check_nulls(["customer_id", "name", "email", "city", "country"])
.check_duplicates(["customer_id"])
.check_range("salary", 1, 500000)
.check_regex("email", "^.+@.+\..+$", "email_format")
.report()
)
Output:
============================================================
DATA QUALITY REPORT: customers
Total rows: 8
============================================================
[PASS] null | customer_id | Failed: 0 (0.0%)
[FAIL] null | name | Failed: 1 (12.5%)
[FAIL] null | email | Failed: 1 (12.5%)
[FAIL] null | city | Failed: 1 (12.5%)
[FAIL] null | country | Failed: 1 (12.5%)
[FAIL] duplicate | ['customer_id'] | Failed: 1 (12.5%)
[FAIL] range | salary | Failed: 2 (25.0%)
[FAIL] email_format | email | Failed: 2 (25.0%)
Summary: 1 passed, 7 failed out of 8 checks
Overall: [FAIL]
============================================================
DQ Report Generation
The report() method in the framework class prints a summary to the console, but in production you need a persistent, queryable DQ report stored as a Delta table:
from datetime import datetime
def save_dq_report(results, table_name, run_id=None):
'''Save DQ results as a Delta table for historical tracking'''
run_id = run_id or datetime.now().strftime("%Y%m%d_%H%M%S")
report_rows = []
for r in results:
report_rows.append({
"run_id": run_id,
"run_timestamp": datetime.now().isoformat(),
"table_name": table_name,
"check_type": r["check"],
"column_name": r["column"],
"failed_rows": r["failed"],
"fail_pct": r["pct"],
"status": r["status"]
})
df_report = spark.createDataFrame(report_rows)
df_report.write.format("delta").mode("append").saveAsTable("dq_monitoring.dq_results")
print(f"DQ report saved: {len(report_rows)} checks for {table_name}")
# Usage — after running the framework:
dq = DataQualityFramework(df, "customers")
results = (dq
.check_nulls(["customer_id", "name", "email"])
.check_duplicates(["customer_id"])
.check_range("salary", 1, 500000)
.report()
)
save_dq_report(results, "customers")
# Query historical DQ trends:
# SELECT table_name, check_type, column_name, fail_pct, run_timestamp
# FROM dq_monitoring.dq_results
# WHERE table_name = 'customers'
# ORDER BY run_timestamp DESC
Over time, this table becomes your data quality dashboard — showing trends like “null rate in email column has been increasing for 3 weeks” before it becomes a crisis.
Integrating DQ into the Medallion Pipeline
DQ in Bronze → Silver (Validation Gate)
# Read Bronze
df_bronze = spark.read.parquet(".../bronze/customers/")
# Run DQ checks
dq = DataQualityFramework(df_bronze, "bronze_customers")
results = (dq
.check_nulls(["customer_id"])
.check_duplicates(["customer_id"])
.check_regex("email", "^.+@.+\..+$")
.report()
)
# Separate good and bad
quarantine_conditions = [
col("customer_id").isNull(),
col("email").isNotNull() & ~col("email").rlike("^.+@.+\..+$")
]
df_bad = dq.get_quarantine(quarantine_conditions)
df_good = df_bronze.exceptAll(df_bad)
# Write good to Silver, bad to Quarantine
df_good.write.format("delta").mode("overwrite").save(".../silver/customers/")
df_bad.write.format("delta").mode("append").save(".../quarantine/customers/")
print(f"Silver: {df_good.count()} rows | Quarantine: {df_bad.count()} rows")
DQ in Gold (Business Rule Validation)
Gold layer DQ checks are different from Bronze→Silver checks. Instead of structural issues (nulls, format), Gold checks validate business rules and aggregation integrity:
# Gold DQ: Business rule validation after building star schema
df_gold = spark.read.format("delta").load(".../gold/fact_orders/")
df_dim_customer = spark.read.format("delta").load(".../gold/dim_customer/")
dq_gold = DataQualityFramework(df_gold, "gold_fact_orders")
results = (dq_gold
.check_nulls(["order_key", "customer_key", "date_key"]) # Surrogate keys must never be NULL
.check_duplicates(["order_key"]) # Fact keys must be unique
.check_range("total_amount", 0, 1000000) # No negative or unrealistic amounts
.report()
)
# Referential integrity: every customer_key in fact must exist in dim
orphans = df_gold.join(df_dim_customer, "customer_key", "left_anti")
if orphans.count() > 0:
print(f"FAIL: {orphans.count()} orders reference non-existent customers")
orphans.write.format("delta").mode("append").save(".../quarantine/orphan_orders/")
# Revenue sanity check: today's total should be within 50% of 30-day average
from pyspark.sql.functions import sum as spark_sum
today_rev = df_gold.filter(col("order_date") == current_date()).agg(spark_sum("total_amount")).collect()[0][0] or 0
avg_rev = df_gold.filter(col("order_date") >= date_sub(current_date(), 30)) \
.groupBy("order_date").agg(spark_sum("total_amount").alias("daily_rev")) \
.agg({"daily_rev": "avg"}).collect()[0][0] or 1
if today_rev < avg_rev * 0.5 or today_rev > avg_rev * 1.5:
print(f"WARN: Today's revenue ${today_rev:,.0f} is outside 50% of 30-day avg ${avg_rev:,.0f}")
The difference: Bronze→Silver DQ catches data defects (broken rows). Gold DQ catches business anomalies (missing references, unrealistic totals, broken star schema integrity).
Automated Alerts on DQ Failures
DQ checks are useless if nobody sees the failures. Set up automated alerts so the team is notified immediately:
def alert_on_dq_failure(results, table_name):
'''Check DQ results and raise alert if any critical checks failed'''
failures = [r for r in results if r["status"] == "FAIL"]
if not failures:
print(f"All DQ checks passed for {table_name}")
return
# Build alert message
alert_msg = f"DQ ALERT: {len(failures)} checks failed for {table_name}\n"
for f in failures:
alert_msg += f" - {f['check']} on {f['column']}: {f['failed']} rows ({f['pct']}%)\n"
print(alert_msg)
# Option 1: Fail the notebook (pipeline catches it and sends Teams/Outlook alert)
if any(f["check"] in ["null", "duplicate"] and f["column"] == "customer_id" for f in failures):
dbutils.notebook.exit(f"CRITICAL DQ FAILURE: {alert_msg}")
# The calling pipeline's red arrow path triggers a Teams notification
# Option 2: Write to an alert table (Data Activator or Power BI monitors it)
alert_df = spark.createDataFrame([{
"timestamp": datetime.now().isoformat(),
"table": table_name,
"failures": len(failures),
"message": alert_msg
}])
alert_df.write.format("delta").mode("append").saveAsTable("dq_monitoring.alerts")
# Usage in pipeline notebook:
results = dq.report()
alert_on_dq_failure(results, "customers")
Alert escalation levels:
INFO: Log to dq_results table (all checks)
WARN: Log + print warning (non-critical failures like invalid email)
CRITICAL: Log + fail notebook + pipeline sends Teams alert (null primary key, broken referential integrity)
Pipeline pattern:
Notebook runs DQ → passes? → continue pipeline
→ fails? → dbutils.notebook.exit("FAILED")
→ pipeline red arrow → Teams notification
→ on-call engineer investigates
Great Expectations vs Custom Framework
Great Expectations (GX) is the most popular open-source data quality library. How does it compare to the custom framework we built?
| Feature | Custom Framework (This Post) | Great Expectations |
|---|---|---|
| Setup | Zero — just PySpark functions | Requires pip install + configuration + data context |
| Learning curve | Low — standard PySpark | Medium — GX-specific API and YAML configs |
| Check types | 8 (built here, extend as needed) | 300+ built-in expectations |
| Documentation | Your code IS the documentation | Auto-generated Data Docs (HTML reports) |
| Schema validation | Manual | Built-in schema expectations |
| Profiling | Manual | Auto-profiling generates expectations from data |
| Spark support | Native PySpark | Supported (with SparkDFExecutionEngine) |
| Maintenance | You maintain it | Community maintained, regular updates |
| Best for | Small-medium teams, full control, learning | Large teams, standardization, 300+ check types needed |
The recommendation: Start with the custom framework from this post — it teaches you what DQ checks actually do under the hood. Once your team grows or you need 50+ check types, evaluate Great Expectations. Many production teams use a hybrid: custom PySpark checks for core validation and GX for advanced profiling and documentation.
Common Data Quality Issues in Real Projects
| Issue | How Often | Impact | Detection |
|---|---|---|---|
| NULL primary keys | Weekly | Broken joins, lost data | Null check on key columns |
| Duplicate records | Daily | Inflated metrics, double-counting | Duplicate check on business key |
| Invalid dates (1900-01-01) | Monthly | Wrong age calculations, broken time series | Range check: date > 2000-01-01 |
| Test/dummy data in production | Quarterly | Inflated revenue, wrong KPIs | Filter: name NOT LIKE ‘Test%’ |
| Encoding issues (mojibake) | Monthly | Garbled names, broken search | Regex check for non-printable chars |
| Schema drift (new columns) | Weekly | Pipeline failures, missing data | Schema comparison with expected |
| Referential integrity breaks | Daily | Orphan records, broken star schema | Anti-join with reference tables |
| Negative amounts | Weekly | Wrong aggregations | Range check: amount >= 0 |
Interview Questions
Q: What are the six dimensions of data quality? A: Completeness (no missing values), Uniqueness (no duplicates), Validity (correct format), Accuracy (factually correct), Consistency (related values agree), and Timeliness (data is fresh enough). Each dimension has specific checks: null checks for completeness, dedup for uniqueness, regex for validity.
Q: What is the quarantine pattern? A: Instead of dropping records that fail validation, route them to a separate quarantine table for investigation. This preserves evidence of source issues, enables reprocessing after fixes, and maintains a complete audit trail. Good records go to Silver, bad records go to Quarantine.
Q: How do you integrate data quality into a Medallion Architecture? A: DQ checks run at two points: Bronze→Silver (validate raw data, quarantine bad records) and Silver→Gold (validate business rules). Bronze→Silver catches structural issues (nulls, duplicates, format). Silver→Gold catches business issues (referential integrity, KPI accuracy).
Q: How do you handle data quality failures in production? A: Three levels: WARN (log the issue but continue processing), FAIL (stop the pipeline and alert), QUARANTINE (separate bad records but continue with good ones). Critical checks (null primary keys) should fail the pipeline. Non-critical checks (invalid email) should quarantine and continue.
Wrapping Up
Data quality is not a one-time activity — it is a continuous process built into every layer of your pipeline. Check at Bronze ingestion, validate at Silver transformation, verify at Gold aggregation. Quarantine bad records instead of dropping them. Report everything. Alert on failures.
The framework we built is simple but production-ready. Eight check types cover 90% of real-world data quality issues. The class-based approach makes it reusable across every table in your data lake. And the quarantine pattern ensures no record is silently lost.
Trust in your data platform starts with data quality. Build it in from day one.
Related posts: – Project 4: Data Quality + SCD – Medallion Architecture – PySpark Transformations Cookbook – Delta Lake Deep Dive
Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.