Data Quality in Azure Databricks: Validation Rules, Quarantine Patterns, and Building a DQ Framework from Scratch
Every pipeline you build has an implicit promise: “The data I produce is correct.” But is it? Are there NULLs where there should not be? Duplicate customer IDs? Negative order amounts? Emails without @ signs? Dates from the year 1900?
Bad data is like a termite infestation — invisible until the damage is done. An analyst builds a dashboard showing $2 million in revenue. The CFO presents it to the board. Then someone discovers 500 test orders were included because the pipeline did not filter them. Trust destroyed. Careers affected. All because nobody checked the data.
Data quality is not optional. It is the difference between a data platform that people trust and one they avoid.
This post builds a practical data quality framework in PySpark — not a theoretical list of principles, but actual code you can run in your Databricks notebooks to validate, quarantine, report, and monitor data quality.
Think of data quality like airport security. Every passenger (row) goes through a checkpoint. Valid passport? (not NULL). Boarding pass matches name? (referential integrity). No prohibited items? (business rule validation). Passengers who pass go to the gate (Silver/Gold). Passengers who fail go to secondary screening (quarantine). Nothing gets through unchecked.
Table of Contents
- The Six Dimensions of Data Quality
- Why Data Quality Matters (The Cost of Bad Data)
- The Quarantine Pattern
- Building the DQ Framework
- DQ Check 1: Null Checks
- DQ Check 2: Duplicate Checks
- DQ Check 3: Data Type Validation
- DQ Check 4: Range and Boundary Checks
- DQ Check 5: Format Validation (Regex)
- DQ Check 6: Referential Integrity
- DQ Check 7: Freshness Checks
- DQ Check 8: Row Count Validation
- The Complete DQ Framework Class
- DQ Report Generation
- Integrating DQ into the Medallion Pipeline
- DQ in Bronze → Silver (Validation Gate)
- DQ in Gold (Business Rule Validation)
- Automated Alerts on DQ Failures
- Great Expectations vs Custom Framework
- Common Data Quality Issues in Real Projects
- Interview Questions
- Wrapping Up
The Six Dimensions of Data Quality
| Dimension | Question It Answers | Example Check |
|---|---|---|
| Completeness | Are all required fields filled? | customer_id is NOT NULL |
| Uniqueness | Are records unique where they should be? | No duplicate customer_ids |
| Validity | Do values conform to expected formats? | Email matches regex pattern |
| Accuracy | Are values factually correct? | Country code exists in reference table |
| Consistency | Do related values agree? | State matches country |
| Timeliness | Is the data fresh enough? | Last update within 24 hours |
Real-life analogy: Imagine hiring a new employee. Completeness = resume has all sections filled. Uniqueness = no duplicate employee IDs. Validity = phone number has correct format. Accuracy = listed degree actually exists. Consistency = listed city matches listed state. Timeliness = resume updated this year, not 2015.
The Quarantine Pattern
Instead of dropping bad records silently, route them to a quarantine table for investigation:
Source Data (100 rows)
|
DQ Validation
/ Good (95) Bad (5)
| |
Silver Quarantine
table table
Why quarantine instead of dropping? – Bad records are evidence of source system issues — you need to investigate – Dropping silently means you never know data is missing – Quarantine lets you fix and reprocess later – Audit compliance requires accounting for every record
Real-life analogy: Airport security does not throw away your bag if it has a suspicious item. They put it in secondary screening (quarantine), investigate, and either clear it or flag it. The bag is accounted for either way.
Building the DQ Framework
The Sample Data (Intentionally Messy)
from pyspark.sql.functions import *
from pyspark.sql.types import *
data = [
(1001, "Naveen", "naveen@email.com", "Toronto", "Canada", 95000, "2023-01-15"),
(1002, "Shrey", "shrey@email.com", "Toronto", "Canada", 88000, "2023-03-20"),
(1003, None, "carol@email", "London", "UK", -5000, "2023-06-10"), # NULL name, invalid email, negative salary
(1004, "Vishnu", None, "12345", "Canada", 91000, "2023-09-01"), # NULL email, numeric city
(1005, "Ravi", "ravi@email.com", "Calgary", None, 85000, "1900-01-01"), # NULL country, suspicious date
(1001, "Naveen", "naveen@email.com", "Toronto", "Canada", 95000, "2023-01-15"), # DUPLICATE
(1006, "Priya", "priya@email.com", "Mumbai", "India", 78000, "2024-03-15"),
(1007, "", "not-an-email", "", "India", 0, "2026-05-18"), # Empty strings, invalid email, zero salary
]
schema = StructType([
StructField("customer_id", IntegerType()),
StructField("name", StringType()),
StructField("email", StringType()),
StructField("city", StringType()),
StructField("country", StringType()),
StructField("salary", IntegerType()),
StructField("hire_date", StringType()),
])
df = spark.createDataFrame(data, schema).withColumn("hire_date", to_date("hire_date"))
df.show(truncate=False)
DQ Check 1: Null Checks
def check_nulls(df, columns, table_name="table"):
'''Check for NULL values in specified columns'''
results = []
total = df.count()
for c in columns:
null_count = df.filter(col(c).isNull() | (col(c) == "")).count()
pct = round(null_count / total * 100, 2) if total > 0 else 0
status = "FAIL" if null_count > 0 else "PASS"
results.append({
"table": table_name,
"check": "null_check",
"column": c,
"total_rows": total,
"failed_rows": null_count,
"fail_pct": pct,
"status": status
})
icon = "FAIL" if status == "FAIL" else "PASS"
print(f" [{icon}] {c}: {null_count} nulls ({pct}%)")
return results
print("=== NULL CHECKS ===")
null_results = check_nulls(df, ["customer_id", "name", "email", "city", "country"], "customers")
DQ Check 2: Duplicate Checks
def check_duplicates(df, key_columns, table_name="table"):
'''Check for duplicate rows based on key columns'''
total = df.count()
distinct = df.dropDuplicates(key_columns).count()
dupes = total - distinct
pct = round(dupes / total * 100, 2) if total > 0 else 0
status = "FAIL" if dupes > 0 else "PASS"
print(f" [{status}] Duplicates on {key_columns}: {dupes} ({pct}%)")
if dupes > 0:
print(" Duplicate records:")
df.groupBy(key_columns).count().filter(col("count") > 1).show()
return [{"table": table_name, "check": "duplicate_check",
"column": str(key_columns), "total_rows": total,
"failed_rows": dupes, "fail_pct": pct, "status": status}]
print("=== DUPLICATE CHECKS ===")
dup_results = check_duplicates(df, ["customer_id"], "customers")
DQ Check 3: Data Type Validation
def check_numeric_city(df, column="city", table_name="table"):
'''Check for numeric values in city column (should be text)'''
bad = df.filter(col(column).rlike("^[0-9]+$")).count()
total = df.count()
pct = round(bad / total * 100, 2)
status = "FAIL" if bad > 0 else "PASS"
print(f" [{status}] Numeric values in {column}: {bad} ({pct}%)")
return [{"table": table_name, "check": "type_check",
"column": column, "total_rows": total,
"failed_rows": bad, "fail_pct": pct, "status": status}]
print("=== TYPE CHECKS ===")
type_results = check_numeric_city(df, "city", "customers")
DQ Check 4: Range and Boundary Checks
def check_range(df, column, min_val, max_val, table_name="table"):
'''Check if values fall within expected range'''
bad = df.filter(
(col(column) < min_val) | (col(column) > max_val) | col(column).isNull()
).count()
total = df.count()
pct = round(bad / total * 100, 2)
status = "FAIL" if bad > 0 else "PASS"
print(f" [{status}] {column} out of range [{min_val}-{max_val}]: {bad} ({pct}%)")
return [{"table": table_name, "check": "range_check",
"column": column, "total_rows": total,
"failed_rows": bad, "fail_pct": pct, "status": status}]
print("=== RANGE CHECKS ===")
range_results = check_range(df, "salary", 1, 500000, "customers")
DQ Check 5: Format Validation (Regex)
def check_email_format(df, column="email", table_name="table"):
'''Validate email format using regex'''
email_regex = "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
bad = df.filter(
col(column).isNotNull() &
(col(column) != "") &
~col(column).rlike(email_regex)
).count()
total = df.filter(col(column).isNotNull() & (col(column) != "")).count()
pct = round(bad / total * 100, 2) if total > 0 else 0
status = "FAIL" if bad > 0 else "PASS"
print(f" [{status}] Invalid email format: {bad} of {total} ({pct}%)")
return [{"table": table_name, "check": "format_check",
"column": column, "total_rows": total,
"failed_rows": bad, "fail_pct": pct, "status": status}]
print("=== FORMAT CHECKS ===")
format_results = check_email_format(df, "email", "customers")
DQ Check 6: Referential Integrity
def check_referential_integrity(df, column, ref_df, ref_column, table_name="table"):
'''Check if values exist in a reference table'''
orphans = df.join(ref_df, df[column] == ref_df[ref_column], "left_anti")
orphan_count = orphans.count()
total = df.count()
pct = round(orphan_count / total * 100, 2)
status = "FAIL" if orphan_count > 0 else "PASS"
print(f" [{status}] Orphan records (no match in reference): {orphan_count} ({pct}%)")
return [{"table": table_name, "check": "referential_integrity",
"column": column, "total_rows": total,
"failed_rows": orphan_count, "fail_pct": pct, "status": status}]
# Reference table of valid countries
valid_countries = spark.createDataFrame(
[("Canada",), ("India",), ("UK",), ("USA",)], ["country_name"]
)
print("=== REFERENTIAL INTEGRITY ===")
ref_results = check_referential_integrity(df.filter(col("country").isNotNull()),
"country", valid_countries, "country_name", "customers")
DQ Check 7: Freshness Checks
def check_freshness(df, date_column, max_age_days=365, table_name="table"):
'''Check if date values are within acceptable freshness'''
stale = df.filter(
col(date_column).isNotNull() &
(datediff(current_date(), col(date_column)) > max_age_days)
).count()
total = df.filter(col(date_column).isNotNull()).count()
pct = round(stale / total * 100, 2) if total > 0 else 0
status = "FAIL" if stale > 0 else "PASS"
print(f" [{status}] Records older than {max_age_days} days: {stale} ({pct}%)")
return [{"table": table_name, "check": "freshness_check",
"column": date_column, "total_rows": total,
"failed_rows": stale, "fail_pct": pct, "status": status}]
print("=== FRESHNESS CHECKS ===")
fresh_results = check_freshness(df, "hire_date", 1095, "customers") # 3 years
DQ Check 8: Row Count Validation
def check_row_count(df, expected_min, expected_max=None, table_name="table"):
'''Validate row count is within expected range'''
actual = df.count()
if expected_max:
status = "PASS" if expected_min <= actual <= expected_max else "FAIL"
print(f" [{status}] Row count: {actual} (expected {expected_min}-{expected_max})")
else:
status = "PASS" if actual >= expected_min else "FAIL"
print(f" [{status}] Row count: {actual} (expected >= {expected_min})")
return [{"table": table_name, "check": "row_count",
"column": "N/A", "total_rows": actual,
"failed_rows": 0 if status == "PASS" else actual,
"fail_pct": 0, "status": status}]
print("=== ROW COUNT CHECKS ===")
count_results = check_row_count(df, 5, 1000, "customers")
The Complete DQ Framework Class
class DataQualityFramework:
'''Reusable data quality validation framework for PySpark DataFrames'''
def __init__(self, df, table_name):
self.df = df
self.table_name = table_name
self.results = []
self.total_rows = df.count()
def check_nulls(self, columns):
for c in columns:
null_count = self.df.filter(col(c).isNull() | (col(c) == "")).count()
pct = round(null_count / self.total_rows * 100, 2)
self.results.append({
"check": "null", "column": c,
"failed": null_count, "pct": pct,
"status": "FAIL" if null_count > 0 else "PASS"
})
return self
def check_duplicates(self, key_columns):
distinct = self.df.dropDuplicates(key_columns).count()
dupes = self.total_rows - distinct
self.results.append({
"check": "duplicate", "column": str(key_columns),
"failed": dupes, "pct": round(dupes / self.total_rows * 100, 2),
"status": "FAIL" if dupes > 0 else "PASS"
})
return self
def check_range(self, column, min_val, max_val):
bad = self.df.filter((col(column) < min_val) | (col(column) > max_val)).count()
self.results.append({
"check": "range", "column": column,
"failed": bad, "pct": round(bad / self.total_rows * 100, 2),
"status": "FAIL" if bad > 0 else "PASS"
})
return self
def check_regex(self, column, pattern, description="format"):
bad = self.df.filter(col(column).isNotNull() & ~col(column).rlike(pattern)).count()
self.results.append({
"check": description, "column": column,
"failed": bad, "pct": round(bad / self.total_rows * 100, 2),
"status": "FAIL" if bad > 0 else "PASS"
})
return self
def get_quarantine(self, conditions):
'''Return rows that fail ANY of the conditions'''
combined = conditions[0]
for c in conditions[1:]:
combined = combined | c
return self.df.filter(combined)
def report(self):
'''Print DQ report'''
print(f"
{'='*60}")
print(f"DATA QUALITY REPORT: {self.table_name}")
print(f"Total rows: {self.total_rows}")
print(f"{'='*60}")
passed = sum(1 for r in self.results if r["status"] == "PASS")
failed = sum(1 for r in self.results if r["status"] == "FAIL")
for r in self.results:
icon = "PASS" if r["status"] == "PASS" else "FAIL"
print(f" [{icon}] {r['check']:12s} | {r['column']:20s} | "
f"Failed: {r['failed']:5d} ({r['pct']}%)")
print(f"
Summary: {passed} passed, {failed} failed out of {len(self.results)} checks")
overall = "PASS" if failed == 0 else "FAIL"
print(f"Overall: [{overall}]")
print(f"{'='*60}")
return self.results
# Usage:
dq = DataQualityFramework(df, "customers")
results = (dq
.check_nulls(["customer_id", "name", "email", "city", "country"])
.check_duplicates(["customer_id"])
.check_range("salary", 1, 500000)
.check_regex("email", "^.+@.+\..+$", "email_format")
.report()
)
Output:
============================================================
DATA QUALITY REPORT: customers
Total rows: 8
============================================================
[PASS] null | customer_id | Failed: 0 (0.0%)
[FAIL] null | name | Failed: 1 (12.5%)
[FAIL] null | email | Failed: 1 (12.5%)
[FAIL] null | city | Failed: 1 (12.5%)
[FAIL] null | country | Failed: 1 (12.5%)
[FAIL] duplicate | ['customer_id'] | Failed: 1 (12.5%)
[FAIL] range | salary | Failed: 2 (25.0%)
[FAIL] email_format | email | Failed: 2 (25.0%)
Summary: 1 passed, 7 failed out of 8 checks
Overall: [FAIL]
============================================================
Integrating DQ into the Medallion Pipeline
Bronze → Silver (Validation Gate)
# Read Bronze
df_bronze = spark.read.parquet(".../bronze/customers/")
# Run DQ checks
dq = DataQualityFramework(df_bronze, "bronze_customers")
results = (dq
.check_nulls(["customer_id"])
.check_duplicates(["customer_id"])
.check_regex("email", "^.+@.+\..+$")
.report()
)
# Separate good and bad
quarantine_conditions = [
col("customer_id").isNull(),
col("email").isNotNull() & ~col("email").rlike("^.+@.+\..+$")
]
df_bad = dq.get_quarantine(quarantine_conditions)
df_good = df_bronze.exceptAll(df_bad)
# Write good to Silver, bad to Quarantine
df_good.write.format("delta").mode("overwrite").save(".../silver/customers/")
df_bad.write.format("delta").mode("append").save(".../quarantine/customers/")
print(f"Silver: {df_good.count()} rows | Quarantine: {df_bad.count()} rows")
Common Data Quality Issues in Real Projects
| Issue | How Often | Impact | Detection |
|---|---|---|---|
| NULL primary keys | Weekly | Broken joins, lost data | Null check on key columns |
| Duplicate records | Daily | Inflated metrics, double-counting | Duplicate check on business key |
| Invalid dates (1900-01-01) | Monthly | Wrong age calculations, broken time series | Range check: date > 2000-01-01 |
| Test/dummy data in production | Quarterly | Inflated revenue, wrong KPIs | Filter: name NOT LIKE ‘Test%’ |
| Encoding issues (mojibake) | Monthly | Garbled names, broken search | Regex check for non-printable chars |
| Schema drift (new columns) | Weekly | Pipeline failures, missing data | Schema comparison with expected |
| Referential integrity breaks | Daily | Orphan records, broken star schema | Anti-join with reference tables |
| Negative amounts | Weekly | Wrong aggregations | Range check: amount >= 0 |
Interview Questions
Q: What are the six dimensions of data quality? A: Completeness (no missing values), Uniqueness (no duplicates), Validity (correct format), Accuracy (factually correct), Consistency (related values agree), and Timeliness (data is fresh enough). Each dimension has specific checks: null checks for completeness, dedup for uniqueness, regex for validity.
Q: What is the quarantine pattern? A: Instead of dropping records that fail validation, route them to a separate quarantine table for investigation. This preserves evidence of source issues, enables reprocessing after fixes, and maintains a complete audit trail. Good records go to Silver, bad records go to Quarantine.
Q: How do you integrate data quality into a Medallion Architecture? A: DQ checks run at two points: Bronze→Silver (validate raw data, quarantine bad records) and Silver→Gold (validate business rules). Bronze→Silver catches structural issues (nulls, duplicates, format). Silver→Gold catches business issues (referential integrity, KPI accuracy).
Q: How do you handle data quality failures in production? A: Three levels: WARN (log the issue but continue processing), FAIL (stop the pipeline and alert), QUARANTINE (separate bad records but continue with good ones). Critical checks (null primary keys) should fail the pipeline. Non-critical checks (invalid email) should quarantine and continue.
Wrapping Up
Data quality is not a one-time activity — it is a continuous process built into every layer of your pipeline. Check at Bronze ingestion, validate at Silver transformation, verify at Gold aggregation. Quarantine bad records instead of dropping them. Report everything. Alert on failures.
The framework we built is simple but production-ready. Eight check types cover 90% of real-world data quality issues. The class-based approach makes it reusable across every table in your data lake. And the quarantine pattern ensures no record is silently lost.
Trust in your data platform starts with data quality. Build it in from day one.
Related posts: – Project 4: Data Quality + SCD – Medallion Architecture – PySpark Transformations Cookbook – Delta Lake Deep Dive
Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.