Delta Lake and PySpark Optimization in Azure Databricks: OPTIMIZE, Z-ORDER, VACUUM, AQE, Broadcast Joins, and the Production Playbook

Delta Lake and PySpark Optimization in Azure Databricks: OPTIMIZE, Z-ORDER, VACUUM, AQE, Broadcast Joins, and the Production Playbook

Your pipeline runs. The data lands in Delta tables. Queries return correct results. Everything works. But it takes 45 minutes instead of 5 minutes. Storage costs climb. Dashboards timeout. Analysts complain.

The pipeline is not broken — it is slow. And slow in production means expensive, unreliable, and frustrating.

Most performance problems in Databricks come from the same handful of issues: too many small files, queries scanning too much data, joins shuffling terabytes across the cluster, and old files bloating storage. The good news is that each of these has a specific, well-known fix.

This post covers every optimization technique you need as a data engineer — from Delta Lake file management (OPTIMIZE, Z-ORDER, VACUUM) to Spark query tuning (AQE, broadcast joins, caching, partitioning). Think of it as the performance playbook that turns 45-minute pipelines into 5-minute pipelines.

Table of Contents

  • Why Delta Tables Slow Down Over Time
  • The Small File Problem Explained
  • OPTIMIZE — Compact Small Files
  • Z-ORDER — Physical Data Co-Location
  • Conditional OPTIMIZE — Optimize Only Recent Data
  • VACUUM — Clean Up Old Files
  • VACUUM + Time Travel Interaction
  • Partitioning Strategy
  • Adaptive Query Execution (AQE)
  • Broadcast Joins
  • Join Optimization Best Practices
  • Caching and Persistence
  • Coalesce vs Repartition
  • Filter Pushdown and Column Pruning
  • File Format Optimization
  • Monitoring and Diagnosing Performance
  • The Production Optimization Checklist
  • Before and After: Real Performance Gains
  • Common Mistakes
  • Interview Questions
  • Wrapping Up

Why Delta Tables Slow Down Over Time

Every INSERT, UPDATE, DELETE, and MERGE on a Delta table creates new Parquet files without modifying old ones. This is how Delta provides ACID transactions and time travel — by never altering existing files.

But over time, this creates a problem:

Day 1:   Pipeline inserts 100K rows → 2 files (50K rows each, ~200 MB each)
Day 30:  Pipeline ran 30 times → 60 files (some 50K rows, some 1K rows)
Day 90:  Pipeline + updates + deletes → 300 files (many under 1 MB)
Day 180: 1,000+ tiny files, queries take 10x longer

The query engine must open each file, read its metadata, and process it. Opening 1,000 small files is dramatically slower than opening 10 large files, even if the total data size is the same.

Real-life analogy: Imagine reading a 300-page novel. You can read it as one book (1 large file) in 3 hours. Now imagine the same novel split across 300 individual pages scattered across 300 envelopes. Finding, opening, and reading 300 envelopes takes much longer — even though the content is identical. That is the small file problem.

The Small File Problem Explained

What Creates Small Files

Operation Why It Creates Small Files
Frequent INSERT (hourly/every-15-min) Each insert creates new files, often small if few rows per batch
UPDATE on individual rows Creates a new file with the updated row + marks old file as removed
DELETE Creates a new file without the deleted rows
MERGE (upsert) Creates new files for both matched (updated) and unmatched (inserted) rows
Streaming writes Micro-batches create many tiny files

The Impact

Files Query Time (same data) Why
10 files x 100 MB ~5 seconds 10 file opens, efficient reads
100 files x 10 MB ~15 seconds 100 file opens, more metadata overhead
1,000 files x 1 MB ~60 seconds 1,000 file opens, massive metadata overhead
10,000 files x 0.1 MB ~300 seconds Unacceptable — system thrashing

Rule of thumb: Each Delta table partition should have files in the 128 MB to 1 GB range. Anything under 10 MB is a “small file.”

OPTIMIZE — Compact Small Files

What It Does

OPTIMIZE reads all the small files in a Delta table and rewrites them into fewer, larger files (target ~1 GB each):

-- Compact all files in the table
OPTIMIZE delta_demo.employees;

Before and After

Before OPTIMIZE:
  _delta_log/
  part-00000.parquet  (2 MB)
  part-00001.parquet  (500 KB)
  part-00002.parquet  (1.5 MB)
  part-00003.parquet  (800 KB)
  part-00004.parquet  (3 MB)
  ... (720 small files from months of hourly inserts)
  Total: 720 files, ~500 MB

After OPTIMIZE:
  _delta_log/
  part-00000.parquet  (250 MB)   ← compacted
  part-00001.parquet  (250 MB)   ← compacted
  Total: 2 files, ~500 MB (same data, fewer files)
  Old 720 files still exist on disk (until VACUUM removes them)

PySpark API

from delta.tables import DeltaTable

dt = DeltaTable.forName(spark, "delta_demo.employees")
dt.optimize().executeCompaction()

How It Works Internally

  1. Spark reads ALL small files into memory
  2. Combines the data into optimal-sized partitions (~1 GB)
  3. Writes new large Parquet files
  4. Updates the Delta transaction log to point to the new files
  5. Old small files are marked as “removed” but NOT deleted (VACUUM does that)

Real-life analogy: OPTIMIZE is like defragmenting a hard drive. Your data is scattered across hundreds of small pieces. OPTIMIZE collects them and rewrites them into a few large, contiguous chunks. Same data, dramatically faster access.

Z-ORDER — Physical Data Co-Location

What It Does

Z-ORDER physically rearranges data within files so that rows with similar values in the Z-ORDER column are stored near each other. This enables file skipping — when you filter on a Z-ORDERED column, Spark can skip entire files based on min/max statistics.

-- Compact AND co-locate data by city and country
OPTIMIZE delta_demo.employees
ZORDER BY (city, country);

How File Skipping Works

Without Z-ORDER, city values are scattered randomly across files:

File 1: [Toronto, Mumbai, London, Toronto, Delhi, Vancouver]
File 2: [Mumbai, Calgary, Toronto, Ottawa, London, Mumbai]
File 3: [Delhi, Toronto, Vancouver, Mumbai, Calgary, London]

Query: WHERE city = 'Toronto'
Spark must read: ALL 3 files (Toronto is in every file)

With Z-ORDER by city:

File 1: [Calgary, Delhi, Delhi, London, London, London]    min=Calgary, max=London
File 2: [Mumbai, Mumbai, Mumbai, Mumbai, Ottawa, Ottawa]   min=Mumbai, max=Ottawa
File 3: [Toronto, Toronto, Toronto, Vancouver, Vancouver]  min=Toronto, max=Vancouver

Query: WHERE city = 'Toronto'
Spark reads: ONLY File 3 (min/max statistics show Toronto is only in File 3)
Files skipped: File 1 and File 2 (67% of data skipped!)

Best Practices for Z-ORDER

Do: – Choose columns that appear frequently in WHERE clauses and JOIN conditions – Choose high-cardinality columns (many distinct values) — city, user_id, country – Limit to 2-4 columns — more columns reduce effectiveness – Run Z-ORDER on large tables (millions of rows) where file skipping matters

Do NOT: – Z-ORDER on columns already used as partition keys — partitioning already separates data for those columns – Z-ORDER on low-cardinality columns (boolean, gender) — too few distinct values to benefit – Z-ORDER on every column — diminishing returns after 3-4 columns

Real-life analogy: Z-ORDER is like organizing a library by subject. Without Z-ORDER, science books are scattered across every shelf. With Z-ORDER, all science books are on shelves 10-15. When someone asks for a science book, the librarian goes directly to shelves 10-15 and skips the other 100 shelves.

Conditional OPTIMIZE — Optimize Only Recent Data

On very large tables, OPTIMIZE on the entire table takes too long. Conditional OPTIMIZE targets only specific partitions:

-- Optimize only data from last month
OPTIMIZE delta_demo.sales
WHERE order_date >= '2026-04-01';

-- Optimize with Z-ORDER on recent data only
OPTIMIZE delta_demo.sales
WHERE order_date >= '2026-04-01'
ZORDER BY (customer_id, product_id);

Production Scheduling Pattern

Daily (2 AM):    OPTIMIZE table WHERE date >= current_date - 7    (last week)
Weekly (Sunday): OPTIMIZE table WHERE date >= current_date - 30   (last month)
Monthly (1st):   OPTIMIZE table (full table — quarterly cleanup)

This focuses compute on recent partitions where small files accumulate from daily loads, while occasionally compacting the full table.

VACUUM — Clean Up Old Files

What It Does

After OPTIMIZE rewrites small files into large ones, the old small files still exist on disk. VACUUM permanently deletes these obsolete files:

-- Remove files older than 7 days that are no longer referenced
VACUUM delta_demo.employees RETAIN 168 HOURS;

What VACUUM Deletes

  • Old Parquet files replaced by OPTIMIZE compaction
  • Old Parquet files from UPDATE/DELETE that created new versions
  • Any file NOT referenced by the current version or versions within retention

What VACUUM Does NOT Delete

  • The _delta_log/ folder (transaction log) — NEVER touched
  • Files still referenced by versions within the retention period

The Right Order

Always run OPTIMIZE first, THEN VACUUM:

-- Step 1: Compact small files into large files
OPTIMIZE delta_demo.employees ZORDER BY (city);

-- Step 2: Remove the old small files (now obsolete)
VACUUM delta_demo.employees RETAIN 168 HOURS;

If you VACUUM without OPTIMIZE, you only remove old version files — you do not compact small files.

Real-life analogy: OPTIMIZE is like packing all your small boxes into a few large moving boxes. VACUUM is like throwing away the empty small boxes. OPTIMIZE reorganizes. VACUUM cleans up.

VACUUM + Time Travel Interaction

This is critical to understand:

Version 0: Created table (5 rows)          → files: A.parquet
Version 1: Inserted 2 rows                 → files: A.parquet, B.parquet
Version 2: Updated salary                  → files: A.parquet, B.parquet, C.parquet (A marked removed)
Version 3: OPTIMIZE                        → files: D.parquet (compacted), A+B+C marked removed
Version 4: VACUUM RETAIN 0 HOURS           → A, B, C physically DELETED from disk

After VACUUM:
  SELECT * FROM table VERSION AS OF 0  → FAILS (A.parquet is gone)
  SELECT * FROM table VERSION AS OF 1  → FAILS (A.parquet, B.parquet are gone)
  SELECT * FROM table VERSION AS OF 2  → FAILS (old files gone)
  SELECT * FROM table VERSION AS OF 3  → WORKS (D.parquet still exists)
  SELECT * FROM table (current)         → WORKS (D.parquet is the current version)

Rule: VACUUM retention period determines how far back time travel works. Keep 7 days (168 hours) minimum. Set longer for compliance-heavy environments.

Partitioning Strategy

What Partitioning Does

Partitioning physically separates data into folders based on column values:

sales/
  year=2025/
    month=11/  → only November 2025 data
    month=12/  → only December 2025 data
  year=2026/
    month=01/  → only January 2026 data

Query WHERE year = 2026 AND month = 1 reads ONLY year=2026/month=01/ — everything else is skipped.

Partitioning Best Practices

Do Do Not
Partition by date columns (year, month) Partition by high-cardinality columns (user_id) — creates millions of tiny folders
Partition by columns used in WHERE clauses Partition by columns never filtered on
Target ~1 GB per partition Create partitions under 100 MB (over-partitioning)
Use 1-3 partition columns Use 5+ partition columns (too many levels)

Partitioning vs Z-ORDER

Feature Partitioning Z-ORDER
How it works Separate folders per value Sorts data within files
Best for Low-cardinality (year, month, country) High-cardinality (user_id, city)
File structure Folder per partition value Files within existing structure
Combine? Yes — partition by date, Z-ORDER by user_id

Adaptive Query Execution (AQE)

What It Is

AQE automatically optimizes query execution at runtime based on actual data statistics. It was introduced in Spark 3.0 and is enabled by default in Databricks.

# Enable AQE (usually already enabled in Databricks)
spark.conf.set("spark.sql.adaptive.enabled", "true")

What AQE Automatically Optimizes

1. Converts Sort-Merge Joins to Broadcast Joins: If AQE detects one side of a join is small enough (after filters reduce it), it automatically broadcasts it — even if it looked large before filtering.

2. Coalesces Small Shuffle Partitions: After a shuffle, some partitions might have very little data. AQE merges these small partitions into larger ones to reduce overhead.

3. Handles Skewed Joins: If one join key has disproportionately more rows (data skew), AQE splits that partition into smaller sub-partitions for parallel processing.

4. Dynamically Adjusts Shuffle Partitions: Instead of using the fixed spark.sql.shuffle.partitions (default 200), AQE adjusts the actual number based on data volume.

Real-life analogy: AQE is like a GPS with live traffic updates. A static GPS (no AQE) plans the route before you start driving and never changes it. AQE replans as you drive — “there is a traffic jam on the highway (data skew), take the side road instead (split partition).”

Broadcast Joins

The Problem with Regular Joins

When you join two DataFrames, Spark shuffles BOTH across the cluster so matching keys end up on the same node:

Without broadcast (shuffle join):
  Large table (10M rows) → SHUFFLED across 10 nodes
  Small table (1K rows)  → SHUFFLED across 10 nodes
  Network transfer: ~10 GB moved
  Time: 5 minutes

The Broadcast Solution

Copy the small table to EVERY worker node. The large table stays in place — no shuffle:

With broadcast:
  Large table (10M rows) → STAYS in place (no shuffle)
  Small table (1K rows)  → COPIED to all 10 nodes (~1 MB per node)
  Network transfer: ~10 MB total
  Time: 30 seconds

How to Use

from pyspark.sql.functions import broadcast

# Manual broadcast hint
df_result = df_large.join(broadcast(df_small), "key_column")

# AQE does this automatically when it detects a small table
# But manual hints guarantee it happens

When to Broadcast

Scenario Broadcast?
Lookup/dimension table (< 10 MB) Always
Medium table (10 MB – 500 MB) Maybe — test both ways
Large table (> 500 MB) Never — will cause OOM on workers
Both tables are large Neither — use standard shuffle join

Real-life analogy: Without broadcast, both teams fly to a central meeting room (expensive). With broadcast, the 3-person team flies to visit the 100-person office (cheap). You always send the smaller group to the bigger group.

Join Optimization Best Practices

1. Match Data Types on Join Columns

# BAD: String-to-Integer join (forces cast, disables optimizations)
df_result = df_orders.join(df_products, df_orders.product_id == df_products.product_id)
# If orders.product_id is INT and products.product_id is STRING → slow

# GOOD: Cast first, then join
df_products = df_products.withColumn("product_id", col("product_id").cast("int"))
df_result = df_orders.join(df_products, "product_id")

2. Filter Before Joining

# BAD: Join 10M rows, then filter
df_result = df_orders.join(df_products, "product_id").filter(col("year") == 2026)

# GOOD: Filter first, join fewer rows
df_orders_2026 = df_orders.filter(col("year") == 2026)  # 1M rows instead of 10M
df_result = df_orders_2026.join(df_products, "product_id")

3. Select Only Needed Columns Before Joining

# BAD: Carry 50 columns through the join (massive shuffle)
df_result = df_orders.join(df_products, "product_id")

# GOOD: Select needed columns first (smaller shuffle)
df_orders_slim = df_orders.select("order_id", "product_id", "amount", "year")
df_products_slim = df_products.select("product_id", "name", "category")
df_result = df_orders_slim.join(df_products_slim, "product_id")

4. Avoid Accidental Cross Joins

# DANGEROUS: Missing or wrong join condition → cartesian product
df_result = df_orders.join(df_products)  # 10M x 1K = 10 BILLION rows!

# SAFE: Always specify a join condition
df_result = df_orders.join(df_products, "product_id")

Caching and Persistence

When to Cache

Cache a DataFrame when it is reused multiple times in the same notebook:

# Without caching: Spark recomputes df_clean EVERY time it is used
df_clean = df_raw.filter(...).withColumn(...).groupBy(...)
df_clean.count()         # Reads raw data, filters, groups, counts
df_clean.show()          # Reads raw data AGAIN, filters AGAIN, groups AGAIN
df_clean.write.parquet() # Reads raw data AGAIN

# With caching: Computed once, stored in memory
df_clean = df_raw.filter(...).withColumn(...).groupBy(...).cache()
df_clean.count()         # First call: computes and caches
df_clean.show()          # Reads from cache (instant)
df_clean.write.parquet() # Reads from cache (instant)

# Unpersist when done (free memory)
df_clean.unpersist()

When NOT to Cache

  • DataFrame used only once — caching adds overhead with no benefit
  • DataFrame is larger than available cluster memory — causes spill to disk
  • The computation is fast anyway — caching adds complexity for minimal gain

Coalesce vs Repartition

The Problem: Too Many Output Files

# Spark creates one file per partition — 200 partitions = 200 files
df.write.parquet("output/")   # Creates 200 small files

Coalesce (Reduce Partitions — No Shuffle)

# Reduce to 10 partitions without a full shuffle
df.coalesce(10).write.parquet("output/")   # Creates 10 files

Coalesce MERGES existing partitions — no data movement across nodes. Fast but can create uneven partitions.

Repartition (Redistribute — Full Shuffle)

# Redistribute into 10 even partitions (shuffle across cluster)
df.repartition(10).write.parquet("output/")   # Creates 10 even files

# Repartition by column (all same keys together)
df.repartition("department").write.parquet("output/")

Repartition creates EVEN partitions but costs a full shuffle.

When to Use Which

Scenario Use
Reducing partitions before writing coalesce() (faster, no shuffle)
Increasing partitions for parallelism repartition() (only way to increase)
Ensuring even data distribution repartition() (even splits)
Pre-partitioning for downstream joins repartition("join_key")

Monitoring and Diagnosing Performance

Check File Count and Sizes

DESCRIBE DETAIL delta_demo.employees;
-- Look at: numFiles, sizeInBytes
-- If numFiles > 100 for a small table → run OPTIMIZE

Check Query Execution Plan

# See how Spark plans to execute a query
df.explain(True)
# Look for: BroadcastHashJoin (good) vs SortMergeJoin (potential shuffle)
# Look for: FileScan with PartitionFilters (good, pruning active)

Spark UI

In Databricks, click the Spark icon in the toolbar during execution to see: – Stages, tasks, and their duration – Shuffle read/write volumes – Skewed tasks (one task much slower than others) – Disk spill (data too large for memory)

The Production Optimization Checklist

For Delta Tables

✅ Run OPTIMIZE weekly (or daily for high-frequency tables)
✅ Use Z-ORDER on 2-3 columns frequently used in WHERE/JOIN
✅ Run VACUUM weekly with 168 hours retention
✅ Use conditional OPTIMIZE on recent partitions for large tables
✅ Monitor file count with DESCRIBE DETAIL
✅ Target 128 MB - 1 GB per file

For Spark Queries

✅ AQE enabled (default in Databricks — verify)
✅ Broadcast joins for small tables (< 10 MB)
✅ Match data types on join columns
✅ Filter BEFORE joins (reduce data volume)
✅ Select ONLY needed columns before joins
✅ Cache DataFrames used more than once
✅ Coalesce before writing to avoid small files
✅ Avoid collect() and toPandas() on large data

For Partitioning

✅ Partition by low-cardinality date columns (year, month)
✅ Target ~1 GB per partition minimum
✅ Z-ORDER by high-cardinality columns within partitions
✅ Do NOT partition by user_id or other high-cardinality columns
✅ Maximum 2-3 partition columns

Common Mistakes

  1. Never running OPTIMIZE — small files accumulate, queries get slower every day. Schedule OPTIMIZE as a regular maintenance job.

  2. VACUUM with 0 hours retention — permanently deletes ALL old files. Time travel breaks completely. Never use 0 in production.

  3. Z-ORDER on partition columns — pointless since partitioning already separates data by those columns. Z-ORDER on non-partition columns that appear in WHERE clauses.

  4. Over-partitioning — partitioning by user_id creates millions of tiny partitions. Use partitioning for low-cardinality columns (date, country) and Z-ORDER for high-cardinality.

  5. Joining without matching data types — string-to-integer joins force expensive casts and disable predicate pushdown. Cast first, join second.

  6. Filtering after join instead of before — joining 10M rows then filtering to 1M wastes 90% of the join computation. Filter to 1M first, then join.

  7. Not caching reused DataFrames — if you use df_clean 5 times, Spark recomputes it 5 times. Cache it after first computation.

  8. Writing without coalesce — default 200 shuffle partitions creates 200 small files on write. Coalesce to 5-10 before writing.

Interview Questions

Q: What is the small file problem and how do you fix it? A: Frequent writes (INSERT, UPDATE, MERGE) create many small Parquet files over time. Each file adds metadata overhead to queries. Fix it with OPTIMIZE (compacts small files into ~1 GB files) followed by VACUUM (removes old obsolete files). Target 128 MB to 1 GB per file.

Q: What is the difference between OPTIMIZE and VACUUM? A: OPTIMIZE rewrites small files into fewer large files — it is a compaction operation. VACUUM deletes old files that are no longer referenced. Run OPTIMIZE first (compact), then VACUUM (cleanup). OPTIMIZE improves query performance. VACUUM reduces storage costs.

Q: What is Z-ORDER and when do you use it? A: Z-ORDER physically co-locates rows with similar column values within files. This enables file skipping — Spark can skip files that do not contain the filtered values based on min/max statistics. Use Z-ORDER on high-cardinality columns (city, user_id) that appear in WHERE and JOIN clauses. Do not Z-ORDER on partition columns.

Q: What is AQE and what does it optimize? A: Adaptive Query Execution dynamically optimizes queries at runtime. It converts joins to broadcast joins when a table is small enough after filtering, coalesces small shuffle partitions, handles data skew by splitting large partitions, and adjusts shuffle partition counts. Enabled by default in Databricks.

Q: When should you broadcast a table in a join? A: When one side is small (under 10 MB). Broadcast copies the small table to every worker node, eliminating the expensive shuffle of the large table. Use broadcast(df_small) or rely on AQE to do it automatically. Never broadcast tables over 500 MB.

Q: What is the difference between coalesce and repartition? A: Coalesce reduces partitions by merging existing ones without a full shuffle — fast but can create uneven sizes. Repartition redistributes data with a full shuffle — slower but creates even partitions. Use coalesce before writing (reduce files). Use repartition to increase parallelism or pre-partition for joins.

Wrapping Up

Optimization is not about writing clever code — it is about understanding how Spark and Delta Lake work physically. Small files slow reads. Shuffles slow joins. Unpartitioned data forces full scans. Once you understand the physics, the fixes are straightforward:

  • Too many files? → OPTIMIZE + VACUUM
  • Queries scanning too much data? → Z-ORDER + Partitioning
  • Joins too slow? → Broadcast + Filter before join + Match types
  • Writing too many files? → Coalesce before write
  • Everything slow? → Enable AQE + Cache reused DataFrames

Schedule OPTIMIZE and VACUUM as regular maintenance. Design partitioning and Z-ORDER at table creation time. Apply join best practices in every notebook. The result: pipelines that run 5-10x faster on the same hardware.

Related posts:Delta Lake Deep DivePySpark Transformations CookbookPySpark JoinsApache Spark and PySpark ArchitectureData File Formats


Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link