PySpark Data Cleaning and Validation: Null Handling, Deduplication, Regex, Type Casting, Data Quality Checks, and Production Cleaning Patterns

PySpark Data Cleaning and Validation: Null Handling, Deduplication, Regex, Type Casting, Data Quality Checks, and Production Cleaning Patterns

Raw data is messy. CSVs have missing values, API responses have inconsistent formats, database extracts have duplicates from failed reloads. Before any analytics or ML model can use this data, it needs to be cleaned — nulls handled, duplicates removed, types corrected, and values standardized.

This post covers every data cleaning technique in PySpark — from basic null handling to production-grade validation patterns — with code you can copy directly into your Databricks notebooks.

Think of data cleaning like food preparation in a restaurant kitchen. Raw ingredients (data) arrive from different suppliers (sources) — some bruised (nulls), some mislabeled (wrong types), some duplicated (extra shipments). The prep cook (Data Engineer) must wash, trim, sort, and validate everything before the chef (analytics/ML) can use it. Skip this step, and the dish (insight) will taste terrible.

Table of Contents

  • Why Data Cleaning Matters
  • Null Handling
  • Checking for Nulls
  • Dropping Nulls
  • Filling Nulls
  • Coalesce for Multi-Column Fallback
  • Deduplication
  • dropDuplicates
  • Window-Based Deduplication (Keep Latest)
  • Type Casting and Conversion
  • String Cleaning
  • Trim, Upper, Lower, InitCap
  • Regex Replace and Extract
  • Standardizing Formats
  • Date Parsing and Standardization
  • Data Validation Patterns
  • Column-Level Validation
  • Row-Level Quality Flags
  • Quarantine Pattern (Good vs Bad Rows)
  • Production Cleaning Pipeline
  • Common Mistakes
  • Interview Questions
  • Wrapping Up

Why Data Cleaning Matters

Raw data reality:
  10,000 rows from CRM export
    → 342 rows have NULL email (3.4%)
    → 89 duplicate customer_ids (from a failed reload)
    → 156 phone numbers in 5 different formats
    → 23 dates as "MM/DD/YYYY" instead of "YYYY-MM-DD"
    → 12 city values are numeric ("12345" instead of a city name)
    → 2 rows have negative ages (-5, -12)

If you skip cleaning:
    → JOIN on customer_id produces 89 extra rows (duplicates)
    → GROUP BY email counts NULLs as one group (wrong aggregation)
    → Date comparisons fail (string "01/15/2026" != date 2026-01-15)
    → ML model trains on garbage data → garbage predictions

Null Handling

Checking for Nulls

from pyspark.sql.functions import col, isnan, when, count, sum as spark_sum

# Count nulls per column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Count nulls + NaN (for numeric columns)
df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) 
           for c in df.columns]).show()

# Percentage of nulls per column
total = df.count()
null_pcts = df.select([
    (spark_sum(col(c).isNull().cast("int")) / total * 100).alias(c) 
    for c in df.columns
])
null_pcts.show()

Dropping Nulls

# Drop rows where ANY column is null
df_clean = df.dropna()

# Drop rows where SPECIFIC columns are null
df_clean = df.dropna(subset=["customer_id", "email"])

# Drop rows where ALL specified columns are null
df_clean = df.dropna(how="all", subset=["phone", "email", "address"])

# Drop rows with more than 3 null columns
df_clean = df.dropna(thresh=len(df.columns) - 3)

Filling Nulls

from pyspark.sql.functions import lit, mean, coalesce

# Fill with static values
df_clean = df.fillna({
    "city": "Unknown",
    "country": "India",
    "salary": 0,
    "is_active": True
})

# Fill numeric column with mean
avg_salary = df.select(mean("salary")).first()[0]
df_clean = df.fillna({"salary": avg_salary})

# Fill with column-specific logic
df_clean = df.withColumn("email",
    when(col("email").isNull(), 
         concat(lower(col("first_name")), lit("@placeholder.com")))
    .otherwise(col("email"))
)

Coalesce for Multi-Column Fallback

# Use the first non-null value from multiple columns
from pyspark.sql.functions import coalesce

df_clean = df.withColumn("contact_email",
    coalesce(col("work_email"), col("personal_email"), lit("no_email@unknown.com"))
)
# If work_email is null, try personal_email. If both null, use default.

Deduplication

dropDuplicates

# Drop exact duplicate rows (all columns match)
df_clean = df.dropDuplicates()

# Drop duplicates based on specific columns (keep first occurrence)
df_clean = df.dropDuplicates(["customer_id"])

# Drop duplicates on composite key
df_clean = df.dropDuplicates(["customer_id", "order_date"])

Window-Based Deduplication (Keep Latest)

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

# Keep only the LATEST record per customer_id (by updated_date)
window = Window.partitionBy("customer_id").orderBy(desc("updated_date"))
df_ranked = df.withColumn("row_num", row_number().over(window))
df_clean = df_ranked.filter(col("row_num") == 1).drop("row_num")

# This is the production pattern — dropDuplicates keeps an arbitrary row,
# window-based dedup keeps the MOST RECENT (or whichever you sort by)

Type Casting and Conversion

from pyspark.sql.types import IntegerType, DoubleType, DateType, TimestampType

# Cast string to integer
df_clean = df.withColumn("age", col("age").cast(IntegerType()))

# Cast string to double (handles decimals)
df_clean = df.withColumn("salary", col("salary").cast(DoubleType()))

# Cast string to date
df_clean = df.withColumn("hire_date", col("hire_date").cast(DateType()))

# Safe casting: invalid values become NULL instead of error
df_clean = df.withColumn("age_safe", col("age_string").cast(IntegerType()))
# "25" → 25, "abc" → null, "25.5" → null (not an integer)

String Cleaning

Trim, Upper, Lower, InitCap

from pyspark.sql.functions import trim, ltrim, rtrim, upper, lower, initcap

df_clean = (df
    .withColumn("name", initcap(trim(col("name"))))           # "  ravi  " → "Ravi"
    .withColumn("email", lower(trim(col("email"))))            # " Ravi@Gmail.COM " → "ravi@gmail.com"
    .withColumn("country_code", upper(trim(col("country"))))   # " india " → "INDIA"
)

Regex Replace and Extract

from pyspark.sql.functions import regexp_replace, regexp_extract

# Remove non-numeric characters from phone
df_clean = df.withColumn("phone_clean", regexp_replace(col("phone"), r"[^0-9]", ""))
# "(416) 555-1234" → "4165551234"

# Extract area code from phone
df_clean = df.withColumn("area_code", regexp_extract(col("phone"), r"\(?(\d{3})\)?", 1))
# "(416) 555-1234" → "416"

# Remove special characters from names
df_clean = df.withColumn("name_clean", regexp_replace(col("name"), r"[^a-zA-Z\s]", ""))
# "O'Brien Jr." → "OBrien Jr"

# Validate email format (flag invalid)
df_clean = df.withColumn("email_valid",
    col("email").rlike(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
)

Standardizing Formats

# Standardize Yes/No to boolean
df_clean = df.withColumn("is_active",
    when(upper(col("status")).isin("Y", "YES", "TRUE", "1", "ACTIVE"), True)
    .when(upper(col("status")).isin("N", "NO", "FALSE", "0", "INACTIVE"), False)
    .otherwise(None)  # Unrecognized values → null
)

# Standardize country names
df_clean = df.withColumn("country",
    when(upper(col("country")).isin("US", "USA", "UNITED STATES"), "United States")
    .when(upper(col("country")).isin("UK", "GB", "UNITED KINGDOM"), "United Kingdom")
    .when(upper(col("country")).isin("IN", "IND", "INDIA"), "India")
    .otherwise(initcap(col("country")))
)

Date Parsing and Standardization

from pyspark.sql.functions import to_date, to_timestamp, date_format, coalesce

# Parse multiple date formats into standard YYYY-MM-DD
df_clean = df.withColumn("standard_date",
    coalesce(
        to_date(col("date_str"), "yyyy-MM-dd"),       # 2026-01-15
        to_date(col("date_str"), "MM/dd/yyyy"),        # 01/15/2026
        to_date(col("date_str"), "dd-MM-yyyy"),        # 15-01-2026
        to_date(col("date_str"), "MMM dd, yyyy"),      # Jan 15, 2026
        to_date(col("date_str"), "yyyyMMdd")           # 20260115
    )
)
# Tries each format in order — first successful parse wins
# Unrecognized formats → null

# Format output date
df_clean = df.withColumn("formatted", date_format(col("standard_date"), "yyyy-MM-dd"))

Data Validation Patterns

Column-Level Validation

# Build a validation summary
validations = df.select(
    count("*").alias("total_rows"),
    spark_sum(col("customer_id").isNull().cast("int")).alias("null_customer_ids"),
    spark_sum(col("email").isNull().cast("int")).alias("null_emails"),
    spark_sum((col("age") < 0).cast("int")).alias("negative_ages"),
    spark_sum((col("age") > 120).cast("int")).alias("impossible_ages"),
    spark_sum(col("email").rlike(r"^[^@]+@[^@]+\.[^@]+$").cast("int")).alias("valid_emails"),
    spark_sum(col("phone_clean").rlike(r"^\d{10}$").cast("int")).alias("valid_phones"),
)
validations.show(truncate=False)

Row-Level Quality Flags

# Add quality flags to each row
df_flagged = (df
    .withColumn("has_email", col("email").isNotNull())
    .withColumn("valid_email", col("email").rlike(r"^[^@]+@[^@]+\.[^@]+$"))
    .withColumn("valid_age", (col("age") >= 0) & (col("age") <= 120))
    .withColumn("valid_phone", col("phone_clean").rlike(r"^\d{10}$"))
    .withColumn("quality_score",
        col("has_email").cast("int") + 
        col("valid_email").cast("int") + 
        col("valid_age").cast("int") + 
        col("valid_phone").cast("int")
    )
)
# quality_score: 4 = perfect, 0 = all checks failed
# Filter: df_flagged.filter(col("quality_score") >= 3) → high-quality rows only

Quarantine Pattern (Good vs Bad Rows)

# Split data into clean and quarantine
is_valid = (
    col("customer_id").isNotNull() &
    col("email").isNotNull() &
    (col("age") >= 0) &
    (col("age") <= 120)
)

df_clean = df.filter(is_valid)
df_quarantine = df.filter(~is_valid)

# Write both
df_clean.write.format("delta").mode("append").saveAsTable("silver.customers_clean")
df_quarantine.write.format("delta").mode("append").saveAsTable("silver.customers_quarantine")

print(f"Clean: {df_clean.count()} rows → silver.customers_clean")
print(f"Quarantine: {df_quarantine.count()} rows → silver.customers_quarantine")
# Quarantine rows are reviewed manually or flagged for source system fix

Production Cleaning Pipeline

def clean_customer_data(df):
    """Production cleaning function for customer data."""
    from pyspark.sql.functions import (
        col, trim, lower, initcap, upper, when, lit,
        regexp_replace, coalesce, to_date, current_timestamp,
        row_number, desc
    )
    from pyspark.sql.window import Window

    # Step 1: Null handling
    df = df.fillna({
        "city": "Unknown",
        "country": "Unknown",
    })
    df = df.withColumn("email",
        when(col("email").isNull(), lit("noemail@placeholder.com"))
        .otherwise(col("email"))
    )

    # Step 2: String cleaning
    df = (df
        .withColumn("first_name", initcap(trim(col("first_name"))))
        .withColumn("last_name", initcap(trim(col("last_name"))))
        .withColumn("email", lower(trim(col("email"))))
        .withColumn("city", initcap(trim(col("city"))))
    )

    # Step 3: Validation
    df = df.withColumn("city",
        when(col("city").rlike(r"^\d+$"), lit("Unknown"))
        .otherwise(col("city"))
    )

    # Step 4: Deduplication (keep latest)
    window = Window.partitionBy("customer_id").orderBy(desc("updated_date"))
    df = df.withColumn("_rn", row_number().over(window))
    df = df.filter(col("_rn") == 1).drop("_rn")

    # Step 5: Audit columns
    df = df.withColumn("cleaned_at", current_timestamp())

    return df

# Usage:
raw_df = spark.table("bronze.customers")
clean_df = clean_customer_data(raw_df)
clean_df.write.format("delta").mode("overwrite").saveAsTable("silver.customers_clean")

Common Mistakes

  1. Using dropna() without subset — drops rows where ANY column is null, which can remove 80% of your data. Always specify which columns matter.
  2. Forgetting that cast() silently returns nullcol("abc").cast(IntegerType()) returns null, not an error. Always check for nulls after casting.
  3. Using dropDuplicates without window rankingdropDuplicates(["customer_id"]) keeps an arbitrary row. In production, use window + row_number to keep the latest or most complete row.
  4. Cleaning after joining — duplicates and nulls in the left table multiply during joins. Always clean BEFORE joining to avoid row explosion.
  5. Not validating after cleaning — run a count check and null check after every cleaning step. If your cleaning removed 50% of rows, something is wrong.

Interview Questions

Q: How do you handle null values in PySpark? A: Use fillna() to replace nulls with defaults, dropna(subset=[...]) to remove rows with nulls in critical columns, coalesce() to pick the first non-null value from multiple columns, and when().otherwise() for conditional null replacement. The approach depends on the business rule — some nulls should be filled, others should cause the row to be quarantined.

Q: How do you deduplicate data in PySpark while keeping the most recent record? A: Use a Window function: partition by the business key (e.g., customer_id), order by the timestamp descending, assign row_number(), and filter to row_number == 1. This keeps exactly one row per entity — the most recent one. This is more reliable than dropDuplicates(), which keeps an arbitrary row.

Q: What is the quarantine pattern in data engineering? A: Split incoming data into “clean” rows (pass all validation rules) and “quarantine” rows (fail at least one rule). Clean rows proceed to Silver/Gold layers. Quarantine rows are written to a separate table for investigation. This ensures bad data never reaches dashboards while preserving it for debugging rather than silently dropping it.

Wrapping Up

Data cleaning is not glamorous, but it is where data engineering delivers the most value. Raw data is never ready for analytics. Every pipeline needs null handling, deduplication, type casting, string standardization, and validation. Build these as reusable functions, test them with edge cases, and always validate the output. Clean data in, clean insights out.

Related posts:PySpark Transformations CookbookData Quality in DatabricksPySpark Window FunctionsPySpark Foundations


Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link