PySpark DataFrame Transformations in Azure Databricks: The Complete Cookbook

PySpark DataFrame Transformations in Azure Databricks: The Complete Cookbook

You can read files into Databricks. You can connect to storage. But between reading and writing lives the real work — transformations. Cleaning names, fixing dates, handling nulls, joining tables, deduplicating records, building aggregations, and reshaping data.

This post is your cookbook. Every transformation function you will use in Databricks, organized by category, with real data examples and the exact code you can paste into your notebook. No theory — just recipes.

Think of this as the difference between knowing how to turn on the stove (reading files) and actually cooking a meal (transforming data). The previous posts taught you the stove. This one teaches you every cooking technique.

Table of Contents

  • The Sample Data
  • Column Operations (Select, Rename, Drop, Reorder, Cast)
  • Filtering Rows
  • Adding and Modifying Columns (withColumn)
  • Conditional Logic (when, otherwise, CASE WHEN)
  • String Functions
  • Date and Timestamp Functions
  • Null Handling
  • Type Casting
  • Aggregations (GroupBy, Pivot, Cube, Rollup)
  • Window Functions
  • Joins
  • Deduplication
  • Working with Complex Types (Arrays, Structs, Maps)
  • Flattening Nested JSON
  • User-Defined Functions (UDFs)
  • Schema Operations
  • Sorting and Limiting
  • Union and Set Operations
  • display() vs show() in Databricks
  • Chaining Transformations (The Pipeline Pattern)
  • Complete Real-World Transformation Pipeline
  • Common Mistakes
  • Interview Questions
  • Wrapping Up

The Sample Data

I will use this employee dataset throughout:

from pyspark.sql.types import *
from pyspark.sql.functions import *

data = [
    (1, "  Alice ", "Johnson", "alice@email.com", "Engineering", "Toronto", "Canada", 95000, "2022-03-15", None),
    (2, "bob", "SMITH", "bob@email.com", "Sales", "Mumbai", "India", 72000, "2023-01-10", "2026-01-15"),
    (3, "Carol", None, "carol@email", "Marketing", "London", "UK", 68000, "2024-02-01", None),
    (4, " Dave", "Brown", None, "Engineering", "12345", "Canada", 91000, "2022-11-30", "2025-06-20"),
    (5, "Eve", "Taylor", "eve@email.com", "Sales", None, "India", 85000, "2021-08-22", None),
    (6, "Frank", "Wilson", "frank@email.com", "Engineering", "Toronto", "Canada", 95000, "2022-03-15", None),
    (1, "Alice", "Johnson", "alice.new@email.com", "Engineering", "Mumbai", "India", 98000, "2022-03-15", "2026-04-01"),
]

schema = StructType([
    StructField("emp_id", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("email", StringType()),
    StructField("department", StringType()),
    StructField("city", StringType()),
    StructField("country", StringType()),
    StructField("salary", IntegerType()),
    StructField("hire_date", StringType()),
    StructField("last_promotion", StringType()),
])

df = spark.createDataFrame(data, schema)
df.show(truncate=False)

Notice the intentional data quality issues: leading/trailing spaces, inconsistent casing, NULL values, invalid email, numeric city, duplicate emp_id.

Column Operations (Select, Rename, Drop, Reorder, Cast)

Select Specific Columns

# Select by name
df.select("emp_id", "first_name", "salary").show()

# Select with column expressions
df.select(col("first_name"), col("salary") * 12).show()

# Select all except specific columns
df.select([c for c in df.columns if c not in ["last_promotion"]]).show()

Rename Columns

# Rename single column
df.withColumnRenamed("first_name", "FirstName").show()

# Rename multiple columns
renamed_df = df
for old, new in {"first_name": "FirstName", "last_name": "LastName", "emp_id": "EmployeeID"}.items():
    renamed_df = renamed_df.withColumnRenamed(old, new)
renamed_df.show()

# Rename all columns to uppercase
df.toDF(*[c.upper() for c in df.columns]).show()

# Rename all columns: replace spaces/special chars with underscores
df.toDF(*[c.strip().replace(" ", "_").lower() for c in df.columns]).show()

Drop Columns

# Drop single column
df.drop("last_promotion").show()

# Drop multiple columns
df.drop("last_promotion", "country").show()

Reorder Columns

# Reorder by specifying the desired order
desired_order = ["emp_id", "first_name", "last_name", "email", "department", "city", "country", "salary", "hire_date"]
df.select(desired_order).show()

Cast (Change Data Types)

# Cast string to date
df.withColumn("hire_date", to_date(col("hire_date"), "yyyy-MM-dd")).printSchema()

# Cast salary to double
df.withColumn("salary", col("salary").cast("double")).printSchema()

# Cast multiple columns
df.withColumn("hire_date", to_date("hire_date"))   .withColumn("last_promotion", to_date("last_promotion"))   .withColumn("salary", col("salary").cast(DoubleType()))   .printSchema()

Real-life analogy: Column operations are like organizing a spreadsheet. Select = choosing which columns to show. Rename = changing header labels. Drop = hiding columns. Cast = changing the cell format (text to number, text to date).

Filtering Rows

# Basic filter
df.filter(col("salary") > 80000).show()
df.filter(col("department") == "Engineering").show()

# Multiple conditions (AND)
df.filter((col("salary") > 70000) & (col("country") == "Canada")).show()

# Multiple conditions (OR)
df.filter((col("department") == "Engineering") | (col("department") == "Sales")).show()

# NOT
df.filter(~(col("department") == "Marketing")).show()

# IN list
df.filter(col("department").isin("Engineering", "Sales")).show()

# NOT IN
df.filter(~col("department").isin("Marketing")).show()

# LIKE (pattern matching)
df.filter(col("email").like("%@email.com")).show()

# Contains
df.filter(col("first_name").contains("li")).show()

# StartsWith / EndsWith
df.filter(col("email").startswith("alice")).show()
df.filter(col("email").endswith(".com")).show()

# IS NULL / IS NOT NULL
df.filter(col("last_name").isNull()).show()
df.filter(col("last_name").isNotNull()).show()

# BETWEEN
df.filter(col("salary").between(70000, 90000)).show()

# Regex
df.filter(col("email").rlike("^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")).show()

Adding and Modifying Columns (withColumn)

# Add new column with constant value
df.withColumn("company", lit("DriveDataScience")).show()

# Add calculated column
df.withColumn("annual_bonus", col("salary") * 0.10).show()

# Add column from existing columns
df.withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name"))).show()

# Modify existing column (overwrite)
df.withColumn("salary", col("salary") + 5000).show()

# Add current timestamp
df.withColumn("processed_at", current_timestamp()).show()

# Add current date
df.withColumn("load_date", current_date()).show()

# Add row number (monotonically increasing ID — NOT guaranteed sequential)
df.withColumn("row_id", monotonically_increasing_id()).show()

# Add literal NULL column
df.withColumn("notes", lit(None).cast(StringType())).show()

Conditional Logic (when, otherwise, CASE WHEN)

Simple if/else

# Single condition
df.withColumn("salary_level",
    when(col("salary") > 90000, "Senior")
    .otherwise("Standard")
).show()

Multiple conditions (chained when)

# Multiple conditions — evaluated top to bottom, first match wins
df.withColumn("salary_band",
    when(col("salary") >= 95000, "Band A - Executive")
    .when(col("salary") >= 85000, "Band B - Senior")
    .when(col("salary") >= 70000, "Band C - Mid")
    .otherwise("Band D - Junior")
).show()

Nested conditions

# Combine with other columns
df.withColumn("location_type",
    when((col("country") == "Canada") & (col("city") == "Toronto"), "HQ")
    .when(col("country") == "Canada", "Canada Branch")
    .when(col("country") == "India"), "India Office")
    .otherwise("International")
).show()

CASE WHEN in Spark SQL

df.createOrReplaceTempView("employees")

spark.sql(
    "SELECT emp_id, first_name, salary, "
    "CASE WHEN salary >= 95000 THEN 'Executive' "
    "WHEN salary >= 85000 THEN 'Senior' "
    "WHEN salary >= 70000 THEN 'Mid' "
    "ELSE 'Junior' END as salary_band "
    "FROM employees"
).show()

Real-life analogy: when/otherwise is like a traffic signal. Green (salary > 95K) → go to Executive lane. Yellow (salary > 85K) → Senior lane. Red (everything else) → Junior lane. Each car (row) checks the signals in order and takes the first matching lane.

String Functions

# Trim whitespace
df.withColumn("first_name", trim(col("first_name"))).show()
df.withColumn("first_name", ltrim(col("first_name"))).show()  # Left trim only
df.withColumn("first_name", rtrim(col("first_name"))).show()  # Right trim only

# Case conversion
df.withColumn("first_name", upper(col("first_name"))).show()      # ALICE
df.withColumn("first_name", lower(col("first_name"))).show()      # alice
df.withColumn("first_name", initcap(col("first_name"))).show()    # Alice

# Concatenate
df.withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name"))).show()
df.withColumn("full_name", concat_ws(" ", col("first_name"), col("last_name"))).show()  # Handles NULLs better

# Substring
df.withColumn("name_initial", substring(col("first_name"), 1, 1)).show()  # First character

# Length
df.withColumn("name_length", length(col("first_name"))).show()

# Replace
df.withColumn("email", regexp_replace(col("email"), "@email$", "@email.com")).show()

# Split into array
df.withColumn("email_parts", split(col("email"), "@")).show()

# Extract domain from email
df.withColumn("domain", split(col("email"), "@").getItem(1)).show()

# Pad
df.withColumn("emp_code", lpad(col("emp_id").cast("string"), 5, "0")).show()  # 00001, 00002

# Reverse
df.withColumn("reversed", reverse(col("first_name"))).show()

# Repeat
df.withColumn("stars", repeat(lit("*"), col("salary") / 10000)).show()

# Translate (character replacement)
df.withColumn("clean", translate(col("city"), "0123456789", "")).show()  # Remove digits

Date and Timestamp Functions

# Convert string to date
df.withColumn("hire_date", to_date(col("hire_date"), "yyyy-MM-dd")).show()

# Convert string to timestamp
df.withColumn("hire_ts", to_timestamp(col("hire_date"), "yyyy-MM-dd")).show()

# Current date and timestamp
df.withColumn("today", current_date())   .withColumn("now", current_timestamp()).show()

# Extract parts
df_dates = df.withColumn("hire_date", to_date("hire_date"))
df_dates.withColumn("hire_year", year("hire_date"))         .withColumn("hire_month", month("hire_date"))         .withColumn("hire_day", dayofmonth("hire_date"))         .withColumn("hire_quarter", quarter("hire_date"))         .withColumn("hire_weekday", dayofweek("hire_date"))         .withColumn("hire_day_name", date_format("hire_date", "EEEE"))         .show()

# Date arithmetic
df_dates.withColumn("one_year_later", date_add("hire_date", 365))         .withColumn("30_days_ago", date_sub(current_date(), 30))         .withColumn("months_employed", months_between(current_date(), col("hire_date")).cast("int"))         .withColumn("days_employed", datediff(current_date(), col("hire_date")))         .show()

# Format date as string
df_dates.withColumn("formatted", date_format("hire_date", "dd-MMM-yyyy")).show()  # 15-Mar-2022
df_dates.withColumn("formatted", date_format("hire_date", "MMMM yyyy")).show()     # March 2022

# First and last day of month
df_dates.withColumn("month_start", trunc("hire_date", "month"))         .withColumn("month_end", last_day("hire_date"))         .show()

# Add months
df_dates.withColumn("six_months_later", add_months("hire_date", 6)).show()

Null Handling

# Check for nulls
df.filter(col("last_name").isNull()).show()
df.filter(col("last_name").isNotNull()).show()

# Count nulls per column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Replace NULLs with default value (specific columns)
df.withColumn("last_name", coalesce(col("last_name"), lit("Unknown"))).show()
df.withColumn("city", coalesce(col("city"), lit("Unknown"))).show()
df.withColumn("email", coalesce(col("email"), lit("noemail@placeholder.com"))).show()

# fillna — replace NULLs across multiple columns
df.fillna({"last_name": "Unknown", "city": "Unknown", "email": "noemail@placeholder.com"}).show()

# fillna all string columns with empty string
df.fillna("").show()

# Drop rows with ANY null
df.dropna().show()

# Drop rows where specific columns are null
df.dropna(subset=["email", "last_name"]).show()

# Drop rows where ALL specified columns are null
df.dropna(how="all", subset=["email", "last_name"]).show()

# Replace specific values with NULL
df.withColumn("city",
    when(col("city").rlike("^[0-9]+$"), None).otherwise(col("city"))
).show()

# coalesce — first non-null value from multiple columns
df.withColumn("contact", coalesce(col("email"), col("first_name"), lit("No Contact"))).show()

Real-life analogy: Null handling is like a form where someone left fields blank. coalesce fills in the blank with a default. dropna throws away incomplete forms. fillna fills ALL blanks with the same default. You choose based on whether incomplete data is acceptable.

Aggregations (GroupBy, Pivot, Cube, Rollup)

Basic GroupBy

# Count per department
df.groupBy("department").count().show()

# Multiple aggregations
df.groupBy("department").agg(
    count("*").alias("emp_count"),
    avg("salary").alias("avg_salary"),
    sum("salary").alias("total_salary"),
    min("salary").alias("min_salary"),
    max("salary").alias("max_salary"),
    round(stddev("salary"), 2).alias("stddev_salary")
).show()

# Group by multiple columns
df.groupBy("department", "country").agg(
    count("*").alias("emp_count"),
    avg("salary").alias("avg_salary")
).orderBy("department", "country").show()

# Count distinct
df.groupBy("department").agg(
    countDistinct("country").alias("unique_countries")
).show()

# Collect values into list
df.groupBy("department").agg(
    collect_list("first_name").alias("employees"),
    collect_set("country").alias("unique_countries")
).show(truncate=False)

Pivot (Cross-Tab)

# Pivot: departments as rows, countries as columns, salary as values
df.groupBy("department")   .pivot("country")   .agg(sum("salary"))   .show()

# Output:
# department  | Canada | India  | UK
# Engineering | 186000 | 98000  | null
# Sales       | null   | 157000 | null
# Marketing   | null   | null   | 68000

Cube and Rollup (Subtotals)

# Rollup: hierarchical subtotals (department → country → total)
df.rollup("department", "country")   .agg(sum("salary").alias("total_salary"), count("*").alias("count"))   .orderBy("department", "country")   .show()
# Shows: per department-country, per department (subtotal), and grand total (all nulls)

# Cube: all combinations of subtotals
df.cube("department", "country")   .agg(sum("salary").alias("total_salary"))   .orderBy("department", "country")   .show()
# Shows: every combination including per-country-only totals

Window Functions

from pyspark.sql.window import Window

# Define window partitioned by department, ordered by salary descending
window_dept = Window.partitionBy("department").orderBy(col("salary").desc())

# Row number (unique rank within each department)
df.withColumn("rank_in_dept", row_number().over(window_dept)).show()

# Rank (with gaps for ties)
df.withColumn("rank", rank().over(window_dept)).show()

# Dense rank (no gaps)
df.withColumn("dense_rank", dense_rank().over(window_dept)).show()

# Top 2 earners per department
df.withColumn("rn", row_number().over(window_dept))   .filter(col("rn") <= 2)   .show()

# Previous salary (LAG)
df.withColumn("prev_salary", lag("salary", 1).over(window_dept)).show()

# Next salary (LEAD)
df.withColumn("next_salary", lead("salary", 1).over(window_dept)).show()

# Running total
window_running = Window.partitionBy("department").orderBy("hire_date")     .rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("running_total", sum("salary").over(window_running)).show()

# Percentage of department total
window_total = Window.partitionBy("department")
df.withColumn("dept_total", sum("salary").over(window_total))   .withColumn("pct_of_dept", round(col("salary") / col("dept_total") * 100, 2))   .show()

# First and last value in window
df.withColumn("highest_paid", first("first_name").over(window_dept))   .withColumn("lowest_paid", last("first_name").over(window_dept))   .show()

# NTILE (divide into buckets)
df.withColumn("salary_quartile", ntile(4).over(Window.orderBy(col("salary").desc()))).show()

Joins

# Create a department reference table
dept_data = [("Engineering", "ENG", "Building A"), ("Sales", "SAL", "Building B"),
             ("Marketing", "MKT", "Building C"), ("HR", "HR", "Building D")]
dept_df = spark.createDataFrame(dept_data, ["dept_name", "dept_code", "location"])

# Inner join
df.join(dept_df, df.department == dept_df.dept_name, "inner").show()

# Left join (keep all employees)
df.join(dept_df, df.department == dept_df.dept_name, "left").show()

# Right join (keep all departments)
df.join(dept_df, df.department == dept_df.dept_name, "right").show()

# Full outer join
df.join(dept_df, df.department == dept_df.dept_name, "full").show()

# Anti join (employees in departments NOT in the reference table)
df.join(dept_df, df.department == dept_df.dept_name, "left_anti").show()

# Semi join (employees in departments that ARE in the reference — no dept columns added)
df.join(dept_df, df.department == dept_df.dept_name, "left_semi").show()

# Cross join
df.crossJoin(dept_df).show()

# Broadcast join (for small lookup tables)
df.join(broadcast(dept_df), df.department == dept_df.dept_name, "left").show()

# Self join (find employees with same salary)
df_a = df.alias("a")
df_b = df.alias("b")
df_a.join(df_b,
    (col("a.salary") == col("b.salary")) & (col("a.emp_id") < col("b.emp_id")),
    "inner"
).select("a.first_name", "b.first_name", "a.salary").show()

Deduplication

# Drop exact duplicate rows
df.dropDuplicates().show()

# Drop duplicates based on specific columns (keep first occurrence)
df.dropDuplicates(["emp_id"]).show()

# Keep the LATEST record per emp_id (using window function)
window_latest = Window.partitionBy("emp_id").orderBy(col("last_promotion").desc_nulls_last())
df_deduped = df.withColumn("rn", row_number().over(window_latest))                .filter(col("rn") == 1)                .drop("rn")
df_deduped.show()

Working with Complex Types (Arrays, Structs, Maps)

Arrays

# Create array from columns
df.withColumn("name_parts", array(col("first_name"), col("last_name"))).show(truncate=False)

# Array contains
df.withColumn("skills", array(lit("Python"), lit("SQL"), lit("Spark")))   .filter(array_contains(col("skills"), "Python")).show()

# Explode array to rows
df_with_skills = df.withColumn("skills", array(lit("Python"), lit("SQL"), lit("Spark")))
df_with_skills.select("emp_id", "first_name", explode("skills").alias("skill")).show()

# Collect into array (reverse of explode)
df.groupBy("department").agg(collect_list("first_name").alias("team")).show(truncate=False)

# Array size
df_with_skills.withColumn("num_skills", size(col("skills"))).show()

# Sort array
df_with_skills.withColumn("sorted_skills", sort_array("skills")).show(truncate=False)

Structs (Nested Objects)

# Create struct
df.withColumn("address", struct(col("city"), col("country"))).show(truncate=False)

# Access struct fields
df_struct = df.withColumn("address", struct(col("city"), col("country")))
df_struct.select("emp_id", "address.city", "address.country").show()

Maps (Key-Value Pairs)

# Create map
df.withColumn("metadata", create_map(
    lit("department"), col("department"),
    lit("country"), col("country")
)).show(truncate=False)

# Access map values
df_map = df.withColumn("metadata", create_map(lit("dept"), col("department")))
df_map.withColumn("dept_value", col("metadata").getItem("dept")).show()

Flattening Nested JSON

# Read nested JSON
json_data = [
    (1, "Alice", {"city": "Toronto", "zip": "M5V"}, [{"skill": "Python", "level": 5}, {"skill": "SQL", "level": 4}]),
    (2, "Bob", {"city": "Mumbai", "zip": "400001"}, [{"skill": "Java", "level": 3}])
]
json_schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("address", StructType([
        StructField("city", StringType()),
        StructField("zip", StringType())
    ])),
    StructField("skills", ArrayType(StructType([
        StructField("skill", StringType()),
        StructField("level", IntegerType())
    ])))
])

df_nested = spark.createDataFrame(json_data, json_schema)

# Flatten struct
df_flat = df_nested.select(
    "id", "name",
    col("address.city").alias("city"),
    col("address.zip").alias("zip_code")
)

# Flatten array of structs (one row per skill)
df_exploded = df_nested.select(
    "id", "name",
    col("address.city").alias("city"),
    explode("skills").alias("skill_info")
).select(
    "id", "name", "city",
    col("skill_info.skill").alias("skill"),
    col("skill_info.level").alias("skill_level")
)
df_exploded.show()
# Result: Alice has 2 rows (Python, SQL), Bob has 1 row (Java)

User-Defined Functions (UDFs)

Python UDF (Slower — Row by Row)

# Define a Python function
def categorize_salary(salary):
    if salary is None:
        return "Unknown"
    elif salary >= 95000:
        return "Executive"
    elif salary >= 80000:
        return "Senior"
    elif salary >= 65000:
        return "Mid"
    else:
        return "Junior"

# Register as UDF
salary_udf = udf(categorize_salary, StringType())

# Use in withColumn
df.withColumn("level", salary_udf(col("salary"))).show()

Pandas UDF (Faster — Vectorized)

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(StringType())
def clean_name(names: pd.Series) -> pd.Series:
    return names.str.strip().str.title()

df.withColumn("clean_first", clean_name(col("first_name"))).show()

When to Use Built-in vs UDF

# ❌ UDF (slow — serializes to Python, row by row)
email_udf = udf(lambda x: x.lower().strip() if x else None, StringType())
df.withColumn("email", email_udf(col("email"))).show()

# ✅ Built-in function (fast — runs in JVM, vectorized)
df.withColumn("email", lower(trim(col("email")))).show()

Rule: Always check pyspark.sql.functions first. Built-in functions are 10-100x faster than UDFs. Only use UDFs for logic that has no built-in equivalent.

Schema Operations

# Print schema (tree format)
df.printSchema()

# Get schema as StructType (programmable)
schema = df.schema
print(schema)

# Get column names
print(df.columns)

# Get column data types
print(df.dtypes)

# Number of rows and columns
print(f"Rows: {df.count()}, Columns: {len(df.columns)}")

# Describe (summary statistics)
df.describe().show()
df.describe("salary").show()  # Single column

# Check for specific column
if "salary" in df.columns:
    print("salary column exists")

Sorting and Limiting

# Sort ascending
df.orderBy("salary").show()
df.orderBy(col("salary").asc()).show()

# Sort descending
df.orderBy(col("salary").desc()).show()

# Sort by multiple columns
df.orderBy(col("department").asc(), col("salary").desc()).show()

# Nulls first / last
df.orderBy(col("last_promotion").asc_nulls_first()).show()
df.orderBy(col("last_promotion").desc_nulls_last()).show()

# Limit (top N rows)
df.orderBy(col("salary").desc()).limit(3).show()

# Take (returns list of Row objects to driver)
top_3 = df.orderBy(col("salary").desc()).take(3)
for row in top_3:
    print(row.first_name, row.salary)

Union and Set Operations

# Create second DataFrame
new_employees = spark.createDataFrame([
    (7, "Grace", "Lee", "grace@email.com", "HR", "Vancouver", "Canada", 78000, "2026-01-01", None),
    (8, "Henry", "Park", "henry@email.com", "Sales", "Seoul", "Korea", 82000, "2025-06-15", None),
], schema)

# Union (combine rows — MUST have same schema)
combined = df.union(new_employees)
combined.show()

# Union by name (matches columns by name, not position)
combined = df.unionByName(new_employees)

# Distinct after union (remove duplicates)
combined.distinct().show()

# Intersect (rows in BOTH DataFrames)
df.intersect(new_employees).show()

# Except (rows in df but NOT in new_employees)
df.exceptAll(new_employees).show()

display() vs show() in Databricks

# show() — plain text output, truncated by default
df.show()           # 20 rows, truncated columns
df.show(50)         # 50 rows
df.show(truncate=False)  # Full column width
df.show(10, False)  # 10 rows, full width

# display() — Databricks-specific, rich HTML table with sorting/filtering
display(df)         # Interactive table with click-to-sort, search, chart options
display(df.limit(100))  # Display first 100 rows

# display() advantages in Databricks:
# - Click column headers to sort
# - Search within results
# - Switch to chart view (bar, line, scatter)
# - Download as CSV
# - Pagination for large results

Rule: Use display() in Databricks notebooks for exploration. Use show() in scripts and unit tests (works everywhere, no Databricks dependency).

Chaining Transformations (The Pipeline Pattern)

# Chain multiple transformations in a readable pipeline
df_clean = (
    df
    # Clean names
    .withColumn("first_name", initcap(trim(col("first_name"))))
    .withColumn("last_name", coalesce(initcap(trim(col("last_name"))), lit("Unknown")))

    # Fix email
    .withColumn("email", 
        coalesce(
            when(col("email").rlike("^.+@.+\..+$"), lower(trim(col("email")))),
            lit("noemail@placeholder.com")
        )
    )

    # Clean city
    .withColumn("city",
        when(col("city").rlike("^[0-9]+$"), lit("Unknown"))
        .when(col("city").isNull(), lit("Unknown"))
        .otherwise(initcap(trim(col("city"))))
    )

    # Add calculated columns
    .withColumn("hire_date", to_date("hire_date"))
    .withColumn("years_employed", round(datediff(current_date(), col("hire_date")) / 365, 1))
    .withColumn("salary_band",
        when(col("salary") >= 95000, "Executive")
        .when(col("salary") >= 80000, "Senior")
        .when(col("salary") >= 65000, "Mid")
        .otherwise("Junior")
    )

    # Deduplicate
    .withColumn("rn", row_number().over(
        Window.partitionBy("emp_id").orderBy(col("last_promotion").desc_nulls_last())
    ))
    .filter(col("rn") == 1)
    .drop("rn", "last_promotion")
)

display(df_clean)

Common Mistakes

  1. Using UDFs when built-in functions existupper(), trim(), coalesce() are 10-100x faster than Python UDFs. Always check built-ins first.

  2. Forgetting col() in expressionsdf.filter("salary" > 80000) compares the STRING “salary” with 80000. Use df.filter(col("salary") > 80000).

  3. Modifying a column without reassigningdf.withColumn("name", upper("name")) returns a NEW DataFrame. The original df is unchanged. You must do df = df.withColumn(...).

  4. Using collect() on large data — brings everything to the driver. Use show(), display(), take(), or write().

  5. Not handling NULLs in concat()concat("Alice", NULL) returns NULL. Use concat_ws() or coalesce() first.

  6. Wrong date format in to_date()to_date("15/03/2022", "yyyy-MM-dd") returns NULL because the format does not match. Use "dd/MM/yyyy".

  7. Forgetting alias() on aggregated columnsavg("salary") creates a column named avg(salary) with parentheses. Always .alias("avg_salary").

Interview Questions

Q: What is the difference between withColumn and select for adding columns? A: withColumn adds or replaces ONE column while keeping all others. select returns ONLY the columns you specify. Use withColumn for adding/modifying individual columns. Use select for choosing a subset or major restructuring.

Q: What is the difference between dropDuplicates and Window + row_number for dedup? A: dropDuplicates keeps an arbitrary first occurrence. Window + row_number lets you control WHICH duplicate to keep (e.g., latest by date). For deterministic deduplication, always use the Window approach.

Q: Why are built-in functions faster than UDFs? A: Built-in functions run in the JVM (Spark’s native engine) and operate on entire columns vectorized. Python UDFs serialize each row to Python, process it, and serialize back — crossing the JVM-Python boundary for every row. This overhead makes UDFs 10-100x slower.

Q: What does coalesce() do and when do you use it? A: coalesce() returns the first non-null value from a list of columns or expressions. Use it for null handling: coalesce(col("email"), lit("unknown")) returns the email if it exists, or “unknown” if it is NULL.

Q: How do you flatten nested JSON in PySpark? A: Use col("struct_field.nested_field") to access struct fields. Use explode() to expand arrays into rows. Combine both for arrays of structs: explode("array_column") then col("exploded.field").

Wrapping Up

This cookbook covers every transformation function you will use in Databricks — from basic column operations to complex window functions, from string cleaning to nested JSON flattening. The key principle: always use built-in functions first, resort to UDFs only when no built-in exists, and chain transformations in a readable pipeline pattern.

Bookmark this post. You will come back to it every time you think “how do I do X in PySpark?”

Related posts:Azure Databricks Introduction and dbutilsReading and Writing File Formats in DatabricksApache Spark and PySpark ArchitectureSQL Window FunctionsData Flows in ADF/Synapse


Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link