Delta Lake and PySpark Optimization in Azure Databricks: OPTIMIZE, Z-ORDER, VACUUM, AQE, Broadcast Joins, and the Production Playbook
Your pipeline runs. The data lands in Delta tables. Queries return correct results. Everything works. But it takes 45 minutes instead of 5 minutes. Storage costs climb. Dashboards timeout. Analysts complain.
The pipeline is not broken — it is slow. And slow in production means expensive, unreliable, and frustrating.
Most performance problems in Databricks come from the same handful of issues: too many small files, queries scanning too much data, joins shuffling terabytes across the cluster, and old files bloating storage. The good news is that each of these has a specific, well-known fix.
This post covers every optimization technique you need as a data engineer — from Delta Lake file management (OPTIMIZE, Z-ORDER, VACUUM) to Spark query tuning (AQE, broadcast joins, caching, partitioning). Think of it as the performance playbook that turns 45-minute pipelines into 5-minute pipelines.
Table of Contents
- Why Delta Tables Slow Down Over Time
- The Small File Problem Explained
- OPTIMIZE — Compact Small Files
- Z-ORDER — Physical Data Co-Location
- Conditional OPTIMIZE — Optimize Only Recent Data
- VACUUM — Clean Up Old Files
- VACUUM + Time Travel Interaction
- Partitioning Strategy
- Adaptive Query Execution (AQE)
- Broadcast Joins
- Join Optimization Best Practices
- Caching and Persistence
- Coalesce vs Repartition
- Filter Pushdown and Column Pruning
- File Format Optimization
- Monitoring and Diagnosing Performance
- The Production Optimization Checklist
- Before and After: Real Performance Gains
- Common Mistakes
- Interview Questions
- Wrapping Up
Why Delta Tables Slow Down Over Time
Every INSERT, UPDATE, DELETE, and MERGE on a Delta table creates new Parquet files without modifying old ones. This is how Delta provides ACID transactions and time travel — by never altering existing files.
But over time, this creates a problem:
Day 1: Pipeline inserts 100K rows → 2 files (50K rows each, ~200 MB each)
Day 30: Pipeline ran 30 times → 60 files (some 50K rows, some 1K rows)
Day 90: Pipeline + updates + deletes → 300 files (many under 1 MB)
Day 180: 1,000+ tiny files, queries take 10x longer
The query engine must open each file, read its metadata, and process it. Opening 1,000 small files is dramatically slower than opening 10 large files, even if the total data size is the same.
Real-life analogy: Imagine reading a 300-page novel. You can read it as one book (1 large file) in 3 hours. Now imagine the same novel split across 300 individual pages scattered across 300 envelopes. Finding, opening, and reading 300 envelopes takes much longer — even though the content is identical. That is the small file problem.
The Small File Problem Explained
What Creates Small Files
| Operation | Why It Creates Small Files |
|---|---|
| Frequent INSERT (hourly/every-15-min) | Each insert creates new files, often small if few rows per batch |
| UPDATE on individual rows | Creates a new file with the updated row + marks old file as removed |
| DELETE | Creates a new file without the deleted rows |
| MERGE (upsert) | Creates new files for both matched (updated) and unmatched (inserted) rows |
| Streaming writes | Micro-batches create many tiny files |
The Impact
| Files | Query Time (same data) | Why |
|---|---|---|
| 10 files x 100 MB | ~5 seconds | 10 file opens, efficient reads |
| 100 files x 10 MB | ~15 seconds | 100 file opens, more metadata overhead |
| 1,000 files x 1 MB | ~60 seconds | 1,000 file opens, massive metadata overhead |
| 10,000 files x 0.1 MB | ~300 seconds | Unacceptable — system thrashing |
Rule of thumb: Each Delta table partition should have files in the 128 MB to 1 GB range. Anything under 10 MB is a “small file.”
OPTIMIZE — Compact Small Files
What It Does
OPTIMIZE reads all the small files in a Delta table and rewrites them into fewer, larger files (target ~1 GB each):
-- Compact all files in the table
OPTIMIZE delta_demo.employees;
Before and After
Before OPTIMIZE:
_delta_log/
part-00000.parquet (2 MB)
part-00001.parquet (500 KB)
part-00002.parquet (1.5 MB)
part-00003.parquet (800 KB)
part-00004.parquet (3 MB)
... (720 small files from months of hourly inserts)
Total: 720 files, ~500 MB
After OPTIMIZE:
_delta_log/
part-00000.parquet (250 MB) ← compacted
part-00001.parquet (250 MB) ← compacted
Total: 2 files, ~500 MB (same data, fewer files)
Old 720 files still exist on disk (until VACUUM removes them)
PySpark API
from delta.tables import DeltaTable
dt = DeltaTable.forName(spark, "delta_demo.employees")
dt.optimize().executeCompaction()
How It Works Internally
- Spark reads ALL small files into memory
- Combines the data into optimal-sized partitions (~1 GB)
- Writes new large Parquet files
- Updates the Delta transaction log to point to the new files
- Old small files are marked as “removed” but NOT deleted (VACUUM does that)
Real-life analogy: OPTIMIZE is like defragmenting a hard drive. Your data is scattered across hundreds of small pieces. OPTIMIZE collects them and rewrites them into a few large, contiguous chunks. Same data, dramatically faster access.
Z-ORDER — Physical Data Co-Location
What It Does
Z-ORDER physically rearranges data within files so that rows with similar values in the Z-ORDER column are stored near each other. This enables file skipping — when you filter on a Z-ORDERED column, Spark can skip entire files based on min/max statistics.
-- Compact AND co-locate data by city and country
OPTIMIZE delta_demo.employees
ZORDER BY (city, country);
How File Skipping Works
Without Z-ORDER, city values are scattered randomly across files:
File 1: [Toronto, Mumbai, London, Toronto, Delhi, Vancouver]
File 2: [Mumbai, Calgary, Toronto, Ottawa, London, Mumbai]
File 3: [Delhi, Toronto, Vancouver, Mumbai, Calgary, London]
Query: WHERE city = 'Toronto'
Spark must read: ALL 3 files (Toronto is in every file)
With Z-ORDER by city:
File 1: [Calgary, Delhi, Delhi, London, London, London] min=Calgary, max=London
File 2: [Mumbai, Mumbai, Mumbai, Mumbai, Ottawa, Ottawa] min=Mumbai, max=Ottawa
File 3: [Toronto, Toronto, Toronto, Vancouver, Vancouver] min=Toronto, max=Vancouver
Query: WHERE city = 'Toronto'
Spark reads: ONLY File 3 (min/max statistics show Toronto is only in File 3)
Files skipped: File 1 and File 2 (67% of data skipped!)
Best Practices for Z-ORDER
Do: – Choose columns that appear frequently in WHERE clauses and JOIN conditions – Choose high-cardinality columns (many distinct values) — city, user_id, country – Limit to 2-4 columns — more columns reduce effectiveness – Run Z-ORDER on large tables (millions of rows) where file skipping matters
Do NOT: – Z-ORDER on columns already used as partition keys — partitioning already separates data for those columns – Z-ORDER on low-cardinality columns (boolean, gender) — too few distinct values to benefit – Z-ORDER on every column — diminishing returns after 3-4 columns
Real-life analogy: Z-ORDER is like organizing a library by subject. Without Z-ORDER, science books are scattered across every shelf. With Z-ORDER, all science books are on shelves 10-15. When someone asks for a science book, the librarian goes directly to shelves 10-15 and skips the other 100 shelves.
Conditional OPTIMIZE — Optimize Only Recent Data
On very large tables, OPTIMIZE on the entire table takes too long. Conditional OPTIMIZE targets only specific partitions:
-- Optimize only data from last month
OPTIMIZE delta_demo.sales
WHERE order_date >= '2026-04-01';
-- Optimize with Z-ORDER on recent data only
OPTIMIZE delta_demo.sales
WHERE order_date >= '2026-04-01'
ZORDER BY (customer_id, product_id);
Production Scheduling Pattern
Daily (2 AM): OPTIMIZE table WHERE date >= current_date - 7 (last week)
Weekly (Sunday): OPTIMIZE table WHERE date >= current_date - 30 (last month)
Monthly (1st): OPTIMIZE table (full table — quarterly cleanup)
This focuses compute on recent partitions where small files accumulate from daily loads, while occasionally compacting the full table.
VACUUM — Clean Up Old Files
What It Does
After OPTIMIZE rewrites small files into large ones, the old small files still exist on disk. VACUUM permanently deletes these obsolete files:
-- Remove files older than 7 days that are no longer referenced
VACUUM delta_demo.employees RETAIN 168 HOURS;
What VACUUM Deletes
- Old Parquet files replaced by OPTIMIZE compaction
- Old Parquet files from UPDATE/DELETE that created new versions
- Any file NOT referenced by the current version or versions within retention
What VACUUM Does NOT Delete
- The
_delta_log/folder (transaction log) — NEVER touched - Files still referenced by versions within the retention period
The Right Order
Always run OPTIMIZE first, THEN VACUUM:
-- Step 1: Compact small files into large files
OPTIMIZE delta_demo.employees ZORDER BY (city);
-- Step 2: Remove the old small files (now obsolete)
VACUUM delta_demo.employees RETAIN 168 HOURS;
If you VACUUM without OPTIMIZE, you only remove old version files — you do not compact small files.
Real-life analogy: OPTIMIZE is like packing all your small boxes into a few large moving boxes. VACUUM is like throwing away the empty small boxes. OPTIMIZE reorganizes. VACUUM cleans up.
VACUUM + Time Travel Interaction
This is critical to understand:
Version 0: Created table (5 rows) → files: A.parquet
Version 1: Inserted 2 rows → files: A.parquet, B.parquet
Version 2: Updated salary → files: A.parquet, B.parquet, C.parquet (A marked removed)
Version 3: OPTIMIZE → files: D.parquet (compacted), A+B+C marked removed
Version 4: VACUUM RETAIN 0 HOURS → A, B, C physically DELETED from disk
After VACUUM:
SELECT * FROM table VERSION AS OF 0 → FAILS (A.parquet is gone)
SELECT * FROM table VERSION AS OF 1 → FAILS (A.parquet, B.parquet are gone)
SELECT * FROM table VERSION AS OF 2 → FAILS (old files gone)
SELECT * FROM table VERSION AS OF 3 → WORKS (D.parquet still exists)
SELECT * FROM table (current) → WORKS (D.parquet is the current version)
Rule: VACUUM retention period determines how far back time travel works. Keep 7 days (168 hours) minimum. Set longer for compliance-heavy environments.
Partitioning Strategy
What Partitioning Does
Partitioning physically separates data into folders based on column values:
sales/
year=2025/
month=11/ → only November 2025 data
month=12/ → only December 2025 data
year=2026/
month=01/ → only January 2026 data
Query WHERE year = 2026 AND month = 1 reads ONLY year=2026/month=01/ — everything else is skipped.
Partitioning Best Practices
| Do | Do Not |
|---|---|
| Partition by date columns (year, month) | Partition by high-cardinality columns (user_id) — creates millions of tiny folders |
| Partition by columns used in WHERE clauses | Partition by columns never filtered on |
| Target ~1 GB per partition | Create partitions under 100 MB (over-partitioning) |
| Use 1-3 partition columns | Use 5+ partition columns (too many levels) |
Partitioning vs Z-ORDER
| Feature | Partitioning | Z-ORDER |
|---|---|---|
| How it works | Separate folders per value | Sorts data within files |
| Best for | Low-cardinality (year, month, country) | High-cardinality (user_id, city) |
| File structure | Folder per partition value | Files within existing structure |
| Combine? | Yes — partition by date, Z-ORDER by user_id |
Adaptive Query Execution (AQE)
What It Is
AQE automatically optimizes query execution at runtime based on actual data statistics. It was introduced in Spark 3.0 and is enabled by default in Databricks.
# Enable AQE (usually already enabled in Databricks)
spark.conf.set("spark.sql.adaptive.enabled", "true")
What AQE Automatically Optimizes
1. Converts Sort-Merge Joins to Broadcast Joins: If AQE detects one side of a join is small enough (after filters reduce it), it automatically broadcasts it — even if it looked large before filtering.
2. Coalesces Small Shuffle Partitions: After a shuffle, some partitions might have very little data. AQE merges these small partitions into larger ones to reduce overhead.
3. Handles Skewed Joins: If one join key has disproportionately more rows (data skew), AQE splits that partition into smaller sub-partitions for parallel processing.
4. Dynamically Adjusts Shuffle Partitions:
Instead of using the fixed spark.sql.shuffle.partitions (default 200), AQE adjusts the actual number based on data volume.
Real-life analogy: AQE is like a GPS with live traffic updates. A static GPS (no AQE) plans the route before you start driving and never changes it. AQE replans as you drive — “there is a traffic jam on the highway (data skew), take the side road instead (split partition).”
Broadcast Joins
The Problem with Regular Joins
When you join two DataFrames, Spark shuffles BOTH across the cluster so matching keys end up on the same node:
Without broadcast (shuffle join):
Large table (10M rows) → SHUFFLED across 10 nodes
Small table (1K rows) → SHUFFLED across 10 nodes
Network transfer: ~10 GB moved
Time: 5 minutes
The Broadcast Solution
Copy the small table to EVERY worker node. The large table stays in place — no shuffle:
With broadcast:
Large table (10M rows) → STAYS in place (no shuffle)
Small table (1K rows) → COPIED to all 10 nodes (~1 MB per node)
Network transfer: ~10 MB total
Time: 30 seconds
How to Use
from pyspark.sql.functions import broadcast
# Manual broadcast hint
df_result = df_large.join(broadcast(df_small), "key_column")
# AQE does this automatically when it detects a small table
# But manual hints guarantee it happens
When to Broadcast
| Scenario | Broadcast? |
|---|---|
| Lookup/dimension table (< 10 MB) | Always |
| Medium table (10 MB – 500 MB) | Maybe — test both ways |
| Large table (> 500 MB) | Never — will cause OOM on workers |
| Both tables are large | Neither — use standard shuffle join |
Real-life analogy: Without broadcast, both teams fly to a central meeting room (expensive). With broadcast, the 3-person team flies to visit the 100-person office (cheap). You always send the smaller group to the bigger group.
Join Optimization Best Practices
1. Match Data Types on Join Columns
# BAD: String-to-Integer join (forces cast, disables optimizations)
df_result = df_orders.join(df_products, df_orders.product_id == df_products.product_id)
# If orders.product_id is INT and products.product_id is STRING → slow
# GOOD: Cast first, then join
df_products = df_products.withColumn("product_id", col("product_id").cast("int"))
df_result = df_orders.join(df_products, "product_id")
2. Filter Before Joining
# BAD: Join 10M rows, then filter
df_result = df_orders.join(df_products, "product_id").filter(col("year") == 2026)
# GOOD: Filter first, join fewer rows
df_orders_2026 = df_orders.filter(col("year") == 2026) # 1M rows instead of 10M
df_result = df_orders_2026.join(df_products, "product_id")
3. Select Only Needed Columns Before Joining
# BAD: Carry 50 columns through the join (massive shuffle)
df_result = df_orders.join(df_products, "product_id")
# GOOD: Select needed columns first (smaller shuffle)
df_orders_slim = df_orders.select("order_id", "product_id", "amount", "year")
df_products_slim = df_products.select("product_id", "name", "category")
df_result = df_orders_slim.join(df_products_slim, "product_id")
4. Avoid Accidental Cross Joins
# DANGEROUS: Missing or wrong join condition → cartesian product
df_result = df_orders.join(df_products) # 10M x 1K = 10 BILLION rows!
# SAFE: Always specify a join condition
df_result = df_orders.join(df_products, "product_id")
Caching and Persistence
When to Cache
Cache a DataFrame when it is reused multiple times in the same notebook:
# Without caching: Spark recomputes df_clean EVERY time it is used
df_clean = df_raw.filter(...).withColumn(...).groupBy(...)
df_clean.count() # Reads raw data, filters, groups, counts
df_clean.show() # Reads raw data AGAIN, filters AGAIN, groups AGAIN
df_clean.write.parquet() # Reads raw data AGAIN
# With caching: Computed once, stored in memory
df_clean = df_raw.filter(...).withColumn(...).groupBy(...).cache()
df_clean.count() # First call: computes and caches
df_clean.show() # Reads from cache (instant)
df_clean.write.parquet() # Reads from cache (instant)
# Unpersist when done (free memory)
df_clean.unpersist()
When NOT to Cache
- DataFrame used only once — caching adds overhead with no benefit
- DataFrame is larger than available cluster memory — causes spill to disk
- The computation is fast anyway — caching adds complexity for minimal gain
Coalesce vs Repartition
The Problem: Too Many Output Files
# Spark creates one file per partition — 200 partitions = 200 files
df.write.parquet("output/") # Creates 200 small files
Coalesce (Reduce Partitions — No Shuffle)
# Reduce to 10 partitions without a full shuffle
df.coalesce(10).write.parquet("output/") # Creates 10 files
Coalesce MERGES existing partitions — no data movement across nodes. Fast but can create uneven partitions.
Repartition (Redistribute — Full Shuffle)
# Redistribute into 10 even partitions (shuffle across cluster)
df.repartition(10).write.parquet("output/") # Creates 10 even files
# Repartition by column (all same keys together)
df.repartition("department").write.parquet("output/")
Repartition creates EVEN partitions but costs a full shuffle.
When to Use Which
| Scenario | Use |
|---|---|
| Reducing partitions before writing | coalesce() (faster, no shuffle) |
| Increasing partitions for parallelism | repartition() (only way to increase) |
| Ensuring even data distribution | repartition() (even splits) |
| Pre-partitioning for downstream joins | repartition("join_key") |
Monitoring and Diagnosing Performance
Check File Count and Sizes
DESCRIBE DETAIL delta_demo.employees;
-- Look at: numFiles, sizeInBytes
-- If numFiles > 100 for a small table → run OPTIMIZE
Check Query Execution Plan
# See how Spark plans to execute a query
df.explain(True)
# Look for: BroadcastHashJoin (good) vs SortMergeJoin (potential shuffle)
# Look for: FileScan with PartitionFilters (good, pruning active)
Spark UI
In Databricks, click the Spark icon in the toolbar during execution to see: – Stages, tasks, and their duration – Shuffle read/write volumes – Skewed tasks (one task much slower than others) – Disk spill (data too large for memory)
The Production Optimization Checklist
For Delta Tables
✅ Run OPTIMIZE weekly (or daily for high-frequency tables)
✅ Use Z-ORDER on 2-3 columns frequently used in WHERE/JOIN
✅ Run VACUUM weekly with 168 hours retention
✅ Use conditional OPTIMIZE on recent partitions for large tables
✅ Monitor file count with DESCRIBE DETAIL
✅ Target 128 MB - 1 GB per file
For Spark Queries
✅ AQE enabled (default in Databricks — verify)
✅ Broadcast joins for small tables (< 10 MB)
✅ Match data types on join columns
✅ Filter BEFORE joins (reduce data volume)
✅ Select ONLY needed columns before joins
✅ Cache DataFrames used more than once
✅ Coalesce before writing to avoid small files
✅ Avoid collect() and toPandas() on large data
For Partitioning
✅ Partition by low-cardinality date columns (year, month)
✅ Target ~1 GB per partition minimum
✅ Z-ORDER by high-cardinality columns within partitions
✅ Do NOT partition by user_id or other high-cardinality columns
✅ Maximum 2-3 partition columns
Common Mistakes
-
Never running OPTIMIZE — small files accumulate, queries get slower every day. Schedule OPTIMIZE as a regular maintenance job.
-
VACUUM with 0 hours retention — permanently deletes ALL old files. Time travel breaks completely. Never use 0 in production.
-
Z-ORDER on partition columns — pointless since partitioning already separates data by those columns. Z-ORDER on non-partition columns that appear in WHERE clauses.
-
Over-partitioning — partitioning by user_id creates millions of tiny partitions. Use partitioning for low-cardinality columns (date, country) and Z-ORDER for high-cardinality.
-
Joining without matching data types — string-to-integer joins force expensive casts and disable predicate pushdown. Cast first, join second.
-
Filtering after join instead of before — joining 10M rows then filtering to 1M wastes 90% of the join computation. Filter to 1M first, then join.
-
Not caching reused DataFrames — if you use df_clean 5 times, Spark recomputes it 5 times. Cache it after first computation.
-
Writing without coalesce — default 200 shuffle partitions creates 200 small files on write. Coalesce to 5-10 before writing.
Interview Questions
Q: What is the small file problem and how do you fix it? A: Frequent writes (INSERT, UPDATE, MERGE) create many small Parquet files over time. Each file adds metadata overhead to queries. Fix it with OPTIMIZE (compacts small files into ~1 GB files) followed by VACUUM (removes old obsolete files). Target 128 MB to 1 GB per file.
Q: What is the difference between OPTIMIZE and VACUUM? A: OPTIMIZE rewrites small files into fewer large files — it is a compaction operation. VACUUM deletes old files that are no longer referenced. Run OPTIMIZE first (compact), then VACUUM (cleanup). OPTIMIZE improves query performance. VACUUM reduces storage costs.
Q: What is Z-ORDER and when do you use it? A: Z-ORDER physically co-locates rows with similar column values within files. This enables file skipping — Spark can skip files that do not contain the filtered values based on min/max statistics. Use Z-ORDER on high-cardinality columns (city, user_id) that appear in WHERE and JOIN clauses. Do not Z-ORDER on partition columns.
Q: What is AQE and what does it optimize? A: Adaptive Query Execution dynamically optimizes queries at runtime. It converts joins to broadcast joins when a table is small enough after filtering, coalesces small shuffle partitions, handles data skew by splitting large partitions, and adjusts shuffle partition counts. Enabled by default in Databricks.
Q: When should you broadcast a table in a join?
A: When one side is small (under 10 MB). Broadcast copies the small table to every worker node, eliminating the expensive shuffle of the large table. Use broadcast(df_small) or rely on AQE to do it automatically. Never broadcast tables over 500 MB.
Q: What is the difference between coalesce and repartition? A: Coalesce reduces partitions by merging existing ones without a full shuffle — fast but can create uneven sizes. Repartition redistributes data with a full shuffle — slower but creates even partitions. Use coalesce before writing (reduce files). Use repartition to increase parallelism or pre-partition for joins.
Wrapping Up
Optimization is not about writing clever code — it is about understanding how Spark and Delta Lake work physically. Small files slow reads. Shuffles slow joins. Unpartitioned data forces full scans. Once you understand the physics, the fixes are straightforward:
- Too many files? → OPTIMIZE + VACUUM
- Queries scanning too much data? → Z-ORDER + Partitioning
- Joins too slow? → Broadcast + Filter before join + Match types
- Writing too many files? → Coalesce before write
- Everything slow? → Enable AQE + Cache reused DataFrames
Schedule OPTIMIZE and VACUUM as regular maintenance. Design partitioning and Z-ORDER at table creation time. Apply join best practices in every notebook. The result: pipelines that run 5-10x faster on the same hardware.
Related posts: – Delta Lake Deep Dive – PySpark Transformations Cookbook – PySpark Joins – Apache Spark and PySpark Architecture – Data File Formats
Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.