PySpark Data Cleaning and Validation: Null Handling, Deduplication, Regex, Type Casting, Data Quality Checks, and Production Cleaning Patterns
Raw data is messy. CSVs have missing values, API responses have inconsistent formats, database extracts have duplicates from failed reloads. Before any analytics or ML model can use this data, it needs to be cleaned — nulls handled, duplicates removed, types corrected, and values standardized.
This post covers every data cleaning technique in PySpark — from basic null handling to production-grade validation patterns — with code you can copy directly into your Databricks notebooks.
Think of data cleaning like food preparation in a restaurant kitchen. Raw ingredients (data) arrive from different suppliers (sources) — some bruised (nulls), some mislabeled (wrong types), some duplicated (extra shipments). The prep cook (Data Engineer) must wash, trim, sort, and validate everything before the chef (analytics/ML) can use it. Skip this step, and the dish (insight) will taste terrible.
Table of Contents
- Why Data Cleaning Matters
- Null Handling
- Checking for Nulls
- Dropping Nulls
- Filling Nulls
- Coalesce for Multi-Column Fallback
- Deduplication
- dropDuplicates
- Window-Based Deduplication (Keep Latest)
- Type Casting and Conversion
- String Cleaning
- Trim, Upper, Lower, InitCap
- Regex Replace and Extract
- Standardizing Formats
- Date Parsing and Standardization
- Data Validation Patterns
- Column-Level Validation
- Row-Level Quality Flags
- Quarantine Pattern (Good vs Bad Rows)
- Production Cleaning Pipeline
- Common Mistakes
- Interview Questions
- Wrapping Up
Why Data Cleaning Matters
Raw data reality:
10,000 rows from CRM export
→ 342 rows have NULL email (3.4%)
→ 89 duplicate customer_ids (from a failed reload)
→ 156 phone numbers in 5 different formats
→ 23 dates as "MM/DD/YYYY" instead of "YYYY-MM-DD"
→ 12 city values are numeric ("12345" instead of a city name)
→ 2 rows have negative ages (-5, -12)
If you skip cleaning:
→ JOIN on customer_id produces 89 extra rows (duplicates)
→ GROUP BY email counts NULLs as one group (wrong aggregation)
→ Date comparisons fail (string "01/15/2026" != date 2026-01-15)
→ ML model trains on garbage data → garbage predictions
Null Handling
Checking for Nulls
from pyspark.sql.functions import col, isnan, when, count, sum as spark_sum
# Count nulls per column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
# Count nulls + NaN (for numeric columns)
df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c)
for c in df.columns]).show()
# Percentage of nulls per column
total = df.count()
null_pcts = df.select([
(spark_sum(col(c).isNull().cast("int")) / total * 100).alias(c)
for c in df.columns
])
null_pcts.show()
Dropping Nulls
# Drop rows where ANY column is null
df_clean = df.dropna()
# Drop rows where SPECIFIC columns are null
df_clean = df.dropna(subset=["customer_id", "email"])
# Drop rows where ALL specified columns are null
df_clean = df.dropna(how="all", subset=["phone", "email", "address"])
# Drop rows with more than 3 null columns
df_clean = df.dropna(thresh=len(df.columns) - 3)
Filling Nulls
from pyspark.sql.functions import lit, mean, coalesce
# Fill with static values
df_clean = df.fillna({
"city": "Unknown",
"country": "India",
"salary": 0,
"is_active": True
})
# Fill numeric column with mean
avg_salary = df.select(mean("salary")).first()[0]
df_clean = df.fillna({"salary": avg_salary})
# Fill with column-specific logic
df_clean = df.withColumn("email",
when(col("email").isNull(),
concat(lower(col("first_name")), lit("@placeholder.com")))
.otherwise(col("email"))
)
Coalesce for Multi-Column Fallback
# Use the first non-null value from multiple columns
from pyspark.sql.functions import coalesce
df_clean = df.withColumn("contact_email",
coalesce(col("work_email"), col("personal_email"), lit("no_email@unknown.com"))
)
# If work_email is null, try personal_email. If both null, use default.
Deduplication
dropDuplicates
# Drop exact duplicate rows (all columns match)
df_clean = df.dropDuplicates()
# Drop duplicates based on specific columns (keep first occurrence)
df_clean = df.dropDuplicates(["customer_id"])
# Drop duplicates on composite key
df_clean = df.dropDuplicates(["customer_id", "order_date"])
Window-Based Deduplication (Keep Latest)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc
# Keep only the LATEST record per customer_id (by updated_date)
window = Window.partitionBy("customer_id").orderBy(desc("updated_date"))
df_ranked = df.withColumn("row_num", row_number().over(window))
df_clean = df_ranked.filter(col("row_num") == 1).drop("row_num")
# This is the production pattern — dropDuplicates keeps an arbitrary row,
# window-based dedup keeps the MOST RECENT (or whichever you sort by)
Type Casting and Conversion
from pyspark.sql.types import IntegerType, DoubleType, DateType, TimestampType
# Cast string to integer
df_clean = df.withColumn("age", col("age").cast(IntegerType()))
# Cast string to double (handles decimals)
df_clean = df.withColumn("salary", col("salary").cast(DoubleType()))
# Cast string to date
df_clean = df.withColumn("hire_date", col("hire_date").cast(DateType()))
# Safe casting: invalid values become NULL instead of error
df_clean = df.withColumn("age_safe", col("age_string").cast(IntegerType()))
# "25" → 25, "abc" → null, "25.5" → null (not an integer)
String Cleaning
Trim, Upper, Lower, InitCap
from pyspark.sql.functions import trim, ltrim, rtrim, upper, lower, initcap
df_clean = (df
.withColumn("name", initcap(trim(col("name")))) # " ravi " → "Ravi"
.withColumn("email", lower(trim(col("email")))) # " Ravi@Gmail.COM " → "ravi@gmail.com"
.withColumn("country_code", upper(trim(col("country")))) # " india " → "INDIA"
)
Regex Replace and Extract
from pyspark.sql.functions import regexp_replace, regexp_extract
# Remove non-numeric characters from phone
df_clean = df.withColumn("phone_clean", regexp_replace(col("phone"), r"[^0-9]", ""))
# "(416) 555-1234" → "4165551234"
# Extract area code from phone
df_clean = df.withColumn("area_code", regexp_extract(col("phone"), r"\(?(\d{3})\)?", 1))
# "(416) 555-1234" → "416"
# Remove special characters from names
df_clean = df.withColumn("name_clean", regexp_replace(col("name"), r"[^a-zA-Z\s]", ""))
# "O'Brien Jr." → "OBrien Jr"
# Validate email format (flag invalid)
df_clean = df.withColumn("email_valid",
col("email").rlike(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
)
Standardizing Formats
# Standardize Yes/No to boolean
df_clean = df.withColumn("is_active",
when(upper(col("status")).isin("Y", "YES", "TRUE", "1", "ACTIVE"), True)
.when(upper(col("status")).isin("N", "NO", "FALSE", "0", "INACTIVE"), False)
.otherwise(None) # Unrecognized values → null
)
# Standardize country names
df_clean = df.withColumn("country",
when(upper(col("country")).isin("US", "USA", "UNITED STATES"), "United States")
.when(upper(col("country")).isin("UK", "GB", "UNITED KINGDOM"), "United Kingdom")
.when(upper(col("country")).isin("IN", "IND", "INDIA"), "India")
.otherwise(initcap(col("country")))
)
Date Parsing and Standardization
from pyspark.sql.functions import to_date, to_timestamp, date_format, coalesce
# Parse multiple date formats into standard YYYY-MM-DD
df_clean = df.withColumn("standard_date",
coalesce(
to_date(col("date_str"), "yyyy-MM-dd"), # 2026-01-15
to_date(col("date_str"), "MM/dd/yyyy"), # 01/15/2026
to_date(col("date_str"), "dd-MM-yyyy"), # 15-01-2026
to_date(col("date_str"), "MMM dd, yyyy"), # Jan 15, 2026
to_date(col("date_str"), "yyyyMMdd") # 20260115
)
)
# Tries each format in order — first successful parse wins
# Unrecognized formats → null
# Format output date
df_clean = df.withColumn("formatted", date_format(col("standard_date"), "yyyy-MM-dd"))
Data Validation Patterns
Column-Level Validation
# Build a validation summary
validations = df.select(
count("*").alias("total_rows"),
spark_sum(col("customer_id").isNull().cast("int")).alias("null_customer_ids"),
spark_sum(col("email").isNull().cast("int")).alias("null_emails"),
spark_sum((col("age") < 0).cast("int")).alias("negative_ages"),
spark_sum((col("age") > 120).cast("int")).alias("impossible_ages"),
spark_sum(col("email").rlike(r"^[^@]+@[^@]+\.[^@]+$").cast("int")).alias("valid_emails"),
spark_sum(col("phone_clean").rlike(r"^\d{10}$").cast("int")).alias("valid_phones"),
)
validations.show(truncate=False)
Row-Level Quality Flags
# Add quality flags to each row
df_flagged = (df
.withColumn("has_email", col("email").isNotNull())
.withColumn("valid_email", col("email").rlike(r"^[^@]+@[^@]+\.[^@]+$"))
.withColumn("valid_age", (col("age") >= 0) & (col("age") <= 120))
.withColumn("valid_phone", col("phone_clean").rlike(r"^\d{10}$"))
.withColumn("quality_score",
col("has_email").cast("int") +
col("valid_email").cast("int") +
col("valid_age").cast("int") +
col("valid_phone").cast("int")
)
)
# quality_score: 4 = perfect, 0 = all checks failed
# Filter: df_flagged.filter(col("quality_score") >= 3) → high-quality rows only
Quarantine Pattern (Good vs Bad Rows)
# Split data into clean and quarantine
is_valid = (
col("customer_id").isNotNull() &
col("email").isNotNull() &
(col("age") >= 0) &
(col("age") <= 120)
)
df_clean = df.filter(is_valid)
df_quarantine = df.filter(~is_valid)
# Write both
df_clean.write.format("delta").mode("append").saveAsTable("silver.customers_clean")
df_quarantine.write.format("delta").mode("append").saveAsTable("silver.customers_quarantine")
print(f"Clean: {df_clean.count()} rows → silver.customers_clean")
print(f"Quarantine: {df_quarantine.count()} rows → silver.customers_quarantine")
# Quarantine rows are reviewed manually or flagged for source system fix
Production Cleaning Pipeline
def clean_customer_data(df):
"""Production cleaning function for customer data."""
from pyspark.sql.functions import (
col, trim, lower, initcap, upper, when, lit,
regexp_replace, coalesce, to_date, current_timestamp,
row_number, desc
)
from pyspark.sql.window import Window
# Step 1: Null handling
df = df.fillna({
"city": "Unknown",
"country": "Unknown",
})
df = df.withColumn("email",
when(col("email").isNull(), lit("noemail@placeholder.com"))
.otherwise(col("email"))
)
# Step 2: String cleaning
df = (df
.withColumn("first_name", initcap(trim(col("first_name"))))
.withColumn("last_name", initcap(trim(col("last_name"))))
.withColumn("email", lower(trim(col("email"))))
.withColumn("city", initcap(trim(col("city"))))
)
# Step 3: Validation
df = df.withColumn("city",
when(col("city").rlike(r"^\d+$"), lit("Unknown"))
.otherwise(col("city"))
)
# Step 4: Deduplication (keep latest)
window = Window.partitionBy("customer_id").orderBy(desc("updated_date"))
df = df.withColumn("_rn", row_number().over(window))
df = df.filter(col("_rn") == 1).drop("_rn")
# Step 5: Audit columns
df = df.withColumn("cleaned_at", current_timestamp())
return df
# Usage:
raw_df = spark.table("bronze.customers")
clean_df = clean_customer_data(raw_df)
clean_df.write.format("delta").mode("overwrite").saveAsTable("silver.customers_clean")
Common Mistakes
- Using dropna() without subset — drops rows where ANY column is null, which can remove 80% of your data. Always specify which columns matter.
- Forgetting that cast() silently returns null —
col("abc").cast(IntegerType())returns null, not an error. Always check for nulls after casting. - Using dropDuplicates without window ranking —
dropDuplicates(["customer_id"])keeps an arbitrary row. In production, use window + row_number to keep the latest or most complete row. - Cleaning after joining — duplicates and nulls in the left table multiply during joins. Always clean BEFORE joining to avoid row explosion.
- Not validating after cleaning — run a count check and null check after every cleaning step. If your cleaning removed 50% of rows, something is wrong.
Interview Questions
Q: How do you handle null values in PySpark?
A: Use fillna() to replace nulls with defaults, dropna(subset=[...]) to remove rows with nulls in critical columns, coalesce() to pick the first non-null value from multiple columns, and when().otherwise() for conditional null replacement. The approach depends on the business rule — some nulls should be filled, others should cause the row to be quarantined.
Q: How do you deduplicate data in PySpark while keeping the most recent record?
A: Use a Window function: partition by the business key (e.g., customer_id), order by the timestamp descending, assign row_number(), and filter to row_number == 1. This keeps exactly one row per entity — the most recent one. This is more reliable than dropDuplicates(), which keeps an arbitrary row.
Q: What is the quarantine pattern in data engineering? A: Split incoming data into “clean” rows (pass all validation rules) and “quarantine” rows (fail at least one rule). Clean rows proceed to Silver/Gold layers. Quarantine rows are written to a separate table for investigation. This ensures bad data never reaches dashboards while preserving it for debugging rather than silently dropping it.
Wrapping Up
Data cleaning is not glamorous, but it is where data engineering delivers the most value. Raw data is never ready for analytics. Every pipeline needs null handling, deduplication, type casting, string standardization, and validation. Build these as reusable functions, test them with edge cases, and always validate the output. Clean data in, clean insights out.
Related posts: – PySpark Transformations Cookbook – Data Quality in Databricks – PySpark Window Functions – PySpark Foundations
Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.