Databricks AutoLoader: Incremental File Ingestion with cloudFiles, Schema Inference, Schema Evolution, and Production Checkpointing
You have a folder in ADLS Gen2. Every hour, new CSV files land in it. You need to read ONLY the new files, not reprocess everything. You need to handle schema changes if the source adds a column next month. And you need it to survive restarts — pick up exactly where it left off.
This is the problem AutoLoader solves. It is Databricks’ built-in solution for incremental file ingestion — the bridge between “files landing in a folder” and “rows in a Delta table.” One line of code replaces what used to take an entire pipeline of Get Metadata → ForEach → Copy → Track Watermark in Azure Data Factory.
Think of AutoLoader like a mailroom clerk at a large office. Every day, new letters (files) arrive at the mailroom (cloud storage). The clerk’s job is to open each letter (read the file), sort the contents (parse the schema), and deliver it to the right department (write to Delta table). Critically, the clerk keeps a log of every letter processed (checkpoint). If the clerk calls in sick Monday and comes back Tuesday, they check the log and process only what arrived since they last worked — never reprocessing old mail, never missing new mail.
AutoLoader does for files what Change Data Capture does for databases — it detects what is new and processes only that.
Table of Contents
- What Is AutoLoader?
- Why Not Just spark.read?
- How AutoLoader Works Under the Hood
- File Discovery Modes: Directory Listing vs File Notification
- Your First AutoLoader Pipeline
- Reading CSV Files with AutoLoader
- Reading JSON Files with AutoLoader
- Reading Parquet Files with AutoLoader
- Schema Inference: Let AutoLoader Figure Out the Schema
- Schema Evolution: Handling New Columns Automatically
- Schema Hints: Overriding Inferred Types
- The Schema Location: Where AutoLoader Stores Schema
- Checkpointing: How AutoLoader Tracks Progress
- Rescued Data Column: Never Lose a Row
- AutoLoader Options Reference
- Production Pattern: Bronze Ingestion with AutoLoader
- Production Pattern: Multi-Source Ingestion
- Monitoring AutoLoader Streams
- AutoLoader vs COPY INTO
- AutoLoader vs ADF Copy Activity
- Common Mistakes
- Interview Questions
- Wrapping Up
What Is AutoLoader?
AutoLoader is a Databricks feature that incrementally and efficiently processes new files as they arrive in cloud storage (ADLS Gen2, S3, GCS). It uses Spark Structured Streaming under the hood with a special source called cloudFiles.
Cloud Storage (ADLS Gen2) Databricks
┌──────────────────────────┐ ┌──────────────────────────┐
│ landing/customers/ │ │ │
│ file_001.csv ✓ done │ │ AutoLoader (cloudFiles) │
│ file_002.csv ✓ done │──────────>│ ↓ │
│ file_003.csv ★ NEW │ │ Delta Table │
│ file_004.csv ★ NEW │ │ (bronze.customers) │
│ │ │ │
└──────────────────────────┘ └──────────────────────────┘
AutoLoader processes ONLY file_003 and file_004.
Files 001 and 002 were already processed in a previous run.
The checkpoint directory tracks which files have been seen.
Key characteristics:
- Incremental — processes only new files, not everything in the folder
- Exactly-once — each file is processed exactly once (no duplicates, no misses)
- Schema-aware — infers schema from data and evolves as source changes
- Fault-tolerant — checkpoint tracks progress, survives restarts
- Scalable — handles millions of files efficiently
Why Not Just spark.read?
The question every beginner asks: “I already read CSV files with spark.read.csv(). Why do I need AutoLoader?”
| Feature | spark.read.csv() (Batch) | AutoLoader (cloudFiles) |
|---|---|---|
| Reads | ALL files in the folder every time | Only NEW files since last run |
| Duplicates | You must handle deduplication yourself | Exactly-once processing guaranteed |
| Progress tracking | None — you must build your own watermark | Built-in checkpoint directory |
| Schema changes | Breaks if source adds a column | Evolves schema automatically |
| Millions of files | Lists ALL files every run (slow) | File Notification mode skips listing |
| Restart behavior | Reprocesses everything from scratch | Resumes from last checkpoint |
| Bad records | Fails or silently drops rows | Rescued data column captures them |
Real-life analogy: spark.read is like rereading your entire email inbox every morning. AutoLoader is like checking only unread emails. When you have 50,000 emails, one takes seconds. The other takes forever.
How AutoLoader Works Under the Hood
Step 1: DISCOVER new files
AutoLoader checks cloud storage for files not yet seen
(using directory listing or event-based notifications)
Step 2: QUEUE discovered files
New files are added to an internal processing queue
Each file is assigned a micro-batch ID
Step 3: READ and PARSE
Spark reads the queued files using the configured format (CSV, JSON, Parquet)
Schema is inferred or applied from hints
Bad records go to the _rescued_data column
Step 4: WRITE to Delta
Parsed data is written to the target Delta table
Using .writeStream with a checkpoint location
Step 5: CHECKPOINT
The checkpoint directory records:
- Which files have been processed
- The current committed offset
- Schema information
On restart, AutoLoader reads the checkpoint and picks up where it left off
File Discovery Modes: Directory Listing vs File Notification
AutoLoader discovers new files using one of two modes:
| Feature | Directory Listing (Default) | File Notification |
|---|---|---|
| How it works | Lists the folder contents, compares to checkpoint | Sets up cloud events (Azure Event Grid) to push notifications |
| Setup | Zero configuration | Requires cloud resource permissions (Event Grid subscription) |
| Cost | Storage API list calls on every trigger | Low — event-driven, no listing |
| Performance at scale | Slows down with millions of files | Constant performance regardless of file count |
| Best for | Small to medium folders (< 1M files) | Large folders (millions of files), high-frequency arrivals |
| Option | "cloudFiles.useNotifications": "false" |
"cloudFiles.useNotifications": "true" |
Real-life analogy: Directory Listing is like walking to your physical mailbox every hour to check for new mail — works fine for a home mailbox, but impractical for a warehouse receiving dock. File Notification is like having the mail carrier ring a doorbell when mail arrives — you only go to the mailbox when something is there.
Start with Directory Listing (the default). Switch to File Notification only when you have millions of files or very high-frequency arrivals. Directory Listing requires zero setup and works for most use cases.
Your First AutoLoader Pipeline
The simplest possible AutoLoader — read new CSV files and write to a Delta table:
# The simplest AutoLoader pipeline
df = (spark.readStream
.format("cloudFiles") # AutoLoader source
.option("cloudFiles.format", "csv") # File format
.option("header", "true") # CSV has headers
.option("cloudFiles.schemaLocation", "/mnt/schema/customers/") # Where to store inferred schema
.load("/mnt/landing/customers/") # Source folder
)
# Write to Delta table with checkpoint
(df.writeStream
.format("delta")
.option("checkpointLocation", "/mnt/checkpoints/customers/") # Progress tracking
.outputMode("append")
.trigger(availableNow=True) # Process all new files, then stop
.toTable("bronze.customers") # Target Delta table
)
That is it. Six lines to replace an entire ADF pipeline of Get Metadata → ForEach → Copy → Track Watermark. AutoLoader discovers new files, reads them, infers the schema, writes to Delta, and checkpoints — all automatically.
Understanding Each Part
| Component | What It Does | Analogy |
|---|---|---|
readStream |
Tells Spark this is a streaming read (not batch) | Opens the mailbox door |
format("cloudFiles") |
Activates AutoLoader (the incremental engine) | Hires the mailroom clerk |
cloudFiles.format |
What kind of files to expect | Clerk knows to expect letters, not packages |
cloudFiles.schemaLocation |
Where AutoLoader stores the inferred schema | Clerk’s filing cabinet for the template |
.load(path) |
The folder to monitor for new files | The mailbox address |
checkpointLocation |
Where progress is tracked between runs | Clerk’s logbook of processed mail |
trigger(availableNow=True) |
Process all available files, then stop | Process today’s mail, then go home |
.toTable() |
Target Delta table name | The department mailbox to deliver to |
Reading CSV Files with AutoLoader
# CSV with full options
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true") # First row is header
.option("inferSchema", "true") # Infer column types (or rely on schemaLocation)
.option("sep", ",") # Delimiter (default comma)
.option("quote", '"') # Quote character
.option("escape", '"') # Escape character
.option("multiLine", "true") # Handle multi-line CSV values
.option("cloudFiles.schemaLocation", "/mnt/schema/sales/")
.load("/mnt/landing/sales/")
)
# Add metadata columns and write
from pyspark.sql.functions import current_timestamp, input_file_name
(df
.withColumn("_ingested_at", current_timestamp()) # When we processed this row
.withColumn("_source_file", input_file_name()) # Which file this row came from
.writeStream
.format("delta")
.option("checkpointLocation", "/mnt/checkpoints/sales/")
.trigger(availableNow=True)
.toTable("bronze.sales")
)
Reading JSON Files with AutoLoader
# JSON (common for API responses and event data)
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("multiLine", "true") # Set true for pretty-printed JSON
.option("cloudFiles.schemaLocation", "/mnt/schema/events/")
.option("cloudFiles.inferColumnTypes", "true") # Infer types beyond just string
.load("/mnt/landing/events/")
)
(df
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", input_file_name())
.writeStream
.format("delta")
.option("checkpointLocation", "/mnt/checkpoints/events/")
.option("mergeSchema", "true") # Allow schema evolution in Delta
.trigger(availableNow=True)
.toTable("bronze.events")
)
Reading Parquet Files with AutoLoader
# Parquet (schema is embedded in the file — no inference needed)
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", "/mnt/schema/orders/")
.load("/mnt/landing/orders/")
)
(df
.withColumn("_ingested_at", current_timestamp())
.writeStream
.format("delta")
.option("checkpointLocation", "/mnt/checkpoints/orders/")
.trigger(availableNow=True)
.toTable("bronze.orders")
)
Parquet is the simplest because the schema is already embedded in the file. No header, inferSchema, or delimiter options needed.
Schema Inference: Let AutoLoader Figure Out the Schema
One of AutoLoader’s most powerful features is automatic schema inference. On the first run, AutoLoader samples the incoming files, infers the column names and data types, and saves the schema to cloudFiles.schemaLocation.
First run:
1. AutoLoader reads file_001.csv
2. Infers: customer_id (int), name (string), email (string), salary (double)
3. Saves schema to /mnt/schema/customers/_schemas/0
Subsequent runs:
1. AutoLoader reads schema from /mnt/schema/customers/_schemas/0
2. Applies saved schema to new files
3. Does NOT re-infer (consistent schema across all runs)
Why is schemaLocation required? Without it, AutoLoader would re-infer the schema every run. If file_001.csv has salary as integer (all whole numbers) but file_050.csv has salary as double (with decimals), the schema would flip-flop. The schemaLocation creates one consistent schema across all runs.
Real-life analogy: Schema inference is like the mailroom clerk creating a filing template on Day 1 after seeing the first batch of letters: “OK, these letters always have: Name (text), Account Number (number), Amount (money), Date (date).” From then on, every letter is sorted using that template — even if the clerk changes shifts.
Schema Evolution: Handling New Columns Automatically
What happens when the source adds a new column six months into production? With spark.read, your pipeline breaks. With AutoLoader, schema evolution handles it automatically.
# Enable schema evolution
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("cloudFiles.schemaLocation", "/mnt/schema/customers/")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") # Automatically add new columns
.load("/mnt/landing/customers/")
)
(df.writeStream
.format("delta")
.option("checkpointLocation", "/mnt/checkpoints/customers/")
.option("mergeSchema", "true") # Allow Delta to accept new columns
.trigger(availableNow=True)
.toTable("bronze.customers")
)
Schema Evolution Modes
| Mode | What Happens When New Column Appears | Best For |
|---|---|---|
addNewColumns |
New column is automatically added to schema and Delta table | Bronze ingestion (accept everything) |
rescue (default) |
New column data goes to _rescued_data column as JSON |
Safe default — nothing breaks, nothing is lost |
failOnNewColumns |
Stream fails with an error | Strict pipelines that require schema review |
none |
New columns are silently ignored | When you only care about known columns |
Example: Source adds a "phone_number" column in month 6
Month 1-5 files: customer_id, name, email, salary
Month 6+ files: customer_id, name, email, salary, phone_number
With addNewColumns:
→ AutoLoader detects the new column
→ Updates the saved schema in schemaLocation
→ Adds phone_number to the Delta table (mergeSchema)
→ Old rows have NULL for phone_number
→ New rows have the value
With rescue (default):
→ AutoLoader keeps the original schema
→ phone_number data goes to _rescued_data column as JSON:
{"phone_number": "416-555-1234"}
→ Nothing breaks, no data is lost
→ You decide later whether to promote it to a real column
Real-life analogy: Schema evolution with addNewColumns is like a hotel reception desk that automatically creates a new room key card type when a new guest category shows up (VIP, corporate, loyalty). The rescue mode puts the unknown guest type in a “miscellaneous” folder until management decides what to do.
Schema Hints: Overriding Inferred Types
Sometimes AutoLoader infers the wrong type. The first file has salary values like 95000, so it infers integer. But later files have 95000.50. Schema hints let you override specific columns:
# Override specific column types with schema hints
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("cloudFiles.schemaLocation", "/mnt/schema/employees/")
.option("cloudFiles.schemaHints",
"salary DOUBLE, hire_date DATE, employee_id LONG, is_active BOOLEAN")
.load("/mnt/landing/employees/")
)
Schema hints override the inferred type for specific columns while letting AutoLoader infer the rest. Think of it as saying: “Figure out the schema yourself, but I know for a fact that salary is a DOUBLE and hire_date is a DATE.”
The Schema Location: Where AutoLoader Stores Schema
/mnt/schema/customers/
└── _schemas/
├── 0 ← Initial inferred schema (JSON)
├── 1 ← Schema after first evolution (new column added)
└── 2 ← Schema after second evolution
Each file contains the full schema definition:
{
"type": "struct",
"fields": [
{"name": "customer_id", "type": "integer", "nullable": true},
{"name": "name", "type": "string", "nullable": true},
{"name": "email", "type": "string", "nullable": true},
{"name": "salary", "type": "double", "nullable": true},
{"name": "phone_number", "type": "string", "nullable": true} ← Added in schema version 1
]
}
Important rules:
- The
schemaLocationdirectory should be unique per AutoLoader stream — never share it between different tables - Do not manually edit files in the schema directory — let AutoLoader manage it
- To force a full schema re-inference, delete the entire schemaLocation directory (the checkpoint still tracks file progress)
- The schemaLocation is separate from the checkpointLocation — they serve different purposes
Checkpointing: How AutoLoader Tracks Progress
The checkpoint directory is how AutoLoader remembers which files it has already processed. Without it, every run would reprocess all files from scratch.
/mnt/checkpoints/customers/
├── commits/ ← Completed micro-batch IDs
│ ├── 0
│ ├── 1
│ └── 2
├── offsets/ ← File lists per micro-batch
│ ├── 0
│ ├── 1
│ └── 2
├── sources/ ← Source-specific state
│ └── 0/
│ └── rocksdb/ ← Tracks discovered files
└── metadata ← Stream metadata
Run 1: Processes file_001.csv, file_002.csv → checkpoint records them
Run 2: Finds file_003.csv is new → processes only file_003.csv
Run 3: No new files → does nothing
Run 4: Finds file_004.csv, file_005.csv → processes both
Critical rules for checkpoints:
- Never share checkpoint directories between different streams
- Never delete a checkpoint directory unless you want to reprocess ALL files
- Keep in durable storage (ADLS Gen2, not local DBFS) for production
- If you change the source path, you need a new checkpoint directory
Real-life analogy: The checkpoint is like a bookmark in a book. It tells you exactly where you stopped reading. Lose the bookmark, and you start from page 1. Never share a bookmark between two people reading different books.
Rescued Data Column: Never Lose a Row
When AutoLoader encounters data that does not match the expected schema — a string in an integer column, an unexpected new column, a malformed row — it does not drop the data. Instead, it puts the unparseable content into a special _rescued_data column:
# Rescued data is enabled by default
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("cloudFiles.schemaLocation", "/mnt/schema/orders/")
.load("/mnt/landing/orders/")
)
# The DataFrame now has all expected columns PLUS _rescued_data
# If a row cannot be parsed, the raw content goes to _rescued_data
# Check what was rescued:
# spark.read.table("bronze.orders").filter("_rescued_data IS NOT NULL").show(truncate=False)
Example: Schema expects order_amount as INTEGER
Row in file: "1001, Naveen, abc, 2026-01-15"
→ customer_id: 1001
→ name: "Naveen"
→ order_amount: NULL (could not parse "abc" as integer)
→ order_date: "2026-01-15"
→ _rescued_data: '{"order_amount": "abc"}'
The row is NOT dropped. The parseable columns are filled. The unparseable
value is captured in _rescued_data as JSON. You can investigate and fix later.
This is fundamentally better than the quarantine pattern we built manually in our Data Quality post — AutoLoader gives you rescue behavior for free at the ingestion layer.
AutoLoader Options Reference
| Option | What It Does | Example Value |
|---|---|---|
cloudFiles.format |
File format to read | "csv", "json", "parquet", "avro", "text", "binaryFile" |
cloudFiles.schemaLocation |
Where to store inferred schema | "/mnt/schema/customers/" |
cloudFiles.schemaEvolutionMode |
How to handle new columns | "addNewColumns", "rescue", "failOnNewColumns", "none" |
cloudFiles.schemaHints |
Override inferred types | "salary DOUBLE, hire_date DATE" |
cloudFiles.useNotifications |
Enable File Notification mode | "true" or "false" (default) |
cloudFiles.inferColumnTypes |
Infer types (not just strings) for JSON | "true" |
cloudFiles.maxFilesPerTrigger |
Max files per micro-batch | "1000" |
cloudFiles.maxBytesPerTrigger |
Max bytes per micro-batch | "10g" |
cloudFiles.includeExistingFiles |
Process files already present on first run | "true" (default) or "false" |
pathGlobFilter |
Only process files matching pattern | "*.csv" |
recursiveFileLookup |
Process files in subdirectories | "true" |
Production Pattern: Bronze Ingestion with AutoLoader
This is the production-grade pattern for ingesting files into the Bronze layer of a Medallion Architecture:
from pyspark.sql.functions import current_timestamp, input_file_name, lit
def ingest_to_bronze(source_path, table_name, file_format="csv", csv_options=None):
'''
Production AutoLoader ingestion to Bronze layer.
Handles schema evolution, rescued data, and metadata columns.
'''
schema_path = f"/mnt/schema/{table_name}/"
checkpoint_path = f"/mnt/checkpoints/{table_name}/"
# Build the reader
reader = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", file_format)
.option("cloudFiles.schemaLocation", schema_path)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
)
# Add CSV-specific options if provided
if file_format == "csv":
reader = (reader
.option("header", "true")
.option("multiLine", "true")
.option("escape", '"')
)
if csv_options:
for key, value in csv_options.items():
reader = reader.option(key, value)
df = reader.load(source_path)
# Add metadata columns
df_enriched = (df
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", input_file_name())
.withColumn("_table_name", lit(table_name))
)
# Write to Bronze Delta table
query = (df_enriched.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable(f"bronze.{table_name}")
)
query.awaitTermination()
print(f"Ingested new files for {table_name}")
return query
# Usage:
ingest_to_bronze("/mnt/landing/customers/", "customers", "csv")
ingest_to_bronze("/mnt/landing/orders/", "orders", "csv")
ingest_to_bronze("/mnt/landing/events/", "events", "json")
ingest_to_bronze("/mnt/landing/products/", "products", "parquet")
Production Pattern: Multi-Source Ingestion
For production pipelines with many tables, use a config-driven approach:
# Config-driven multi-table ingestion
ingestion_config = [
{"source": "/mnt/landing/customers/", "table": "customers", "format": "csv"},
{"source": "/mnt/landing/orders/", "table": "orders", "format": "csv"},
{"source": "/mnt/landing/events/", "table": "events", "format": "json"},
{"source": "/mnt/landing/products/", "table": "products", "format": "parquet"},
{"source": "/mnt/landing/inventory/", "table": "inventory", "format": "csv",
"options": {"sep": "|", "cloudFiles.schemaHints": "quantity INT, price DOUBLE"}},
]
# Ingest all sources
for config in ingestion_config:
try:
ingest_to_bronze(
source_path=config["source"],
table_name=config["table"],
file_format=config["format"],
csv_options=config.get("options")
)
print(f" ✓ {config['table']} ingested successfully")
except Exception as e:
print(f" ✗ {config['table']} FAILED: {str(e)}")
# In production: log to alert table, send notification, continue with other tables
Real-life analogy: This is like a warehouse receiving dock with a manifest. The manifest (config) says: “Door 1 expects pallets of electronics (CSV). Door 2 expects boxes of clothing (JSON). Door 3 expects crates of produce (Parquet).” Each door processes independently, and if one door jams, the others keep working.
Monitoring AutoLoader Streams
# Check stream status
query = (df.writeStream
.format("delta")
.option("checkpointLocation", "/mnt/checkpoints/customers/")
.trigger(availableNow=True)
.toTable("bronze.customers")
)
# After starting the stream, check progress:
query.status # Current status
query.lastProgress # Details of last micro-batch
query.recentProgress # List of recent micro-batches
# Example output from lastProgress:
# {
# "numInputRows": 5000,
# "inputRowsPerSecond": 2500.0,
# "processedRowsPerSecond": 10000.0,
# "sources": [{
# "description": "CloudFilesSource[/mnt/landing/customers/]",
# "numFilesOutstanding": 0,
# "numBytesOutstanding": 0
# }]
# }
# Verify what was written:
df_bronze = spark.read.table("bronze.customers")
print(f"Total rows in Bronze: {df_bronze.count()}")
print(f"Rescued rows: {df_bronze.filter('_rescued_data IS NOT NULL').count()}")
print(f"Source files processed:")
df_bronze.select("_source_file").distinct().show(truncate=False)
AutoLoader vs COPY INTO
Databricks offers two methods for incremental file ingestion: AutoLoader and the SQL COPY INTO command. Here is when to use each:
| Feature | AutoLoader (cloudFiles) | COPY INTO (SQL) |
|---|---|---|
| Language | PySpark (readStream) | SQL command |
| Tracking | External checkpoint directory | Internal (tracks files in Delta log) |
| Schema evolution | Full support (addNewColumns, rescue) | Limited |
| Schema inference | Automatic + saved to schemaLocation | Requires explicit schema |
| File Notification mode | Supported (Event Grid) | Not supported |
| Rescued data | Built-in _rescued_data column | Not available |
| Performance at scale | Better (streaming architecture) | Good for smaller volumes |
| Best for | Production pipelines, large-scale ingestion | Quick ad-hoc loads, SQL-first teams |
-- COPY INTO example (for comparison)
COPY INTO bronze.customers
FROM '/mnt/landing/customers/'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
The recommendation: Use AutoLoader for production pipelines. Use COPY INTO for ad-hoc or one-time loads, or when your team prefers SQL. AutoLoader is more feature-rich and scalable.
AutoLoader vs ADF Copy Activity
| Feature | AutoLoader | ADF Copy Activity |
|---|---|---|
| Incremental | Built-in (checkpoint) | Requires watermark logic (Lookup + If Condition) |
| Schema evolution | Automatic | Manual (update dataset schema) |
| Code | 6 lines of PySpark | 5-10 pipeline activities (Get Metadata, ForEach, Copy, etc.) |
| File tracking | Built-in checkpoint | You build tracking logic (watermark table, last modified date) |
| Target | Delta table (directly) | ADLS files (then separate processing needed) |
| Rescued data | Automatic | Not available |
| Best for | File-to-Delta ingestion in Databricks | Database-to-database copies, non-Databricks environments |
If you are already in Databricks and ingesting files from cloud storage, AutoLoader replaces ADF Copy Activity entirely for this use case. It is simpler, more reliable, and handles schema evolution natively.
Trigger Modes Explained
| Trigger | Behavior | Use Case |
|---|---|---|
trigger(availableNow=True) |
Process all available new files, then stop | Scheduled batch jobs (most common) |
trigger(processingTime="1 minute") |
Check for new files every 1 minute, run continuously | Near-real-time ingestion |
trigger(once=True) |
Process one micro-batch, then stop (legacy) | Replaced by availableNow |
| No trigger (default) | Continuous streaming (~100ms intervals) | Real-time requirements |
For most production Bronze pipelines, use trigger(availableNow=True). Schedule the notebook to run hourly or daily via a Databricks Workflow. Each run processes all files that arrived since the last run, writes them to Delta, and stops. Cost-effective and simple.
trigger(once=True) is the older version. availableNow=True is superior because it processes ALL pending micro-batches in one run rather than limiting to a single micro-batch. Always prefer availableNow.
Common Mistakes
-
Sharing checkpoint directories between streams — each AutoLoader stream must have its own unique checkpoint directory. Sharing causes data corruption and unpredictable behavior.
-
Deleting the checkpoint directory to “fix” issues — this causes ALL files to be reprocessed from scratch. If you must reset, also truncate the target Delta table. Otherwise you get duplicates.
-
Forgetting cloudFiles.schemaLocation — without it, AutoLoader re-infers the schema every run. This leads to inconsistent types across runs (integer in run 1, string in run 2).
-
Not adding mergeSchema on the write side — if you enable
schemaEvolutionMode: addNewColumnson the read side but forgetmergeSchema: trueon the Delta write side, the stream fails when new columns appear. -
Using spark.read instead of AutoLoader for recurring loads —
spark.readreprocesses everything. For any recurring file ingestion, use AutoLoader. -
Not monitoring _rescued_data — rescued data means something did not parse correctly. Ignoring it means silently losing data. Build alerts on
_rescued_data IS NOT NULL. -
Putting schema and checkpoint directories in DBFS — DBFS is local to the workspace. If you migrate or recreate the workspace, you lose checkpoints. Use ADLS Gen2 or Unity Catalog Volumes for production.
-
Using trigger(once=True) instead of availableNow —
once=Trueprocesses only one micro-batch per run.availableNow=Trueprocesses all pending batches. Always useavailableNowfor scheduled jobs.
Interview Questions
Q: What is AutoLoader in Databricks?
A: AutoLoader is an incremental file ingestion feature that uses Spark Structured Streaming with a cloudFiles source. It automatically detects new files in cloud storage, processes only those new files, infers and evolves schema, and tracks progress via checkpoints. It replaces manual batch file processing with a reliable, exactly-once ingestion pipeline.
Q: How does AutoLoader track which files have been processed?
A: Through a checkpoint directory specified by checkpointLocation. The checkpoint stores committed offsets, file lists per micro-batch, and source state. On restart, AutoLoader reads the checkpoint and processes only files not yet committed. This ensures exactly-once processing — no duplicates, no missed files.
Q: What are the two file discovery modes and when do you use each? A: Directory Listing (default) lists the folder contents and compares to the checkpoint — simple, zero setup, good for up to about a million files. File Notification uses cloud event subscriptions (Azure Event Grid) to push new file notifications — better for millions of files and high-frequency arrivals, but requires additional cloud resource permissions.
Q: How does schema evolution work in AutoLoader?
A: AutoLoader infers the schema on the first run and saves it to cloudFiles.schemaLocation. When new columns appear in later files, the behavior depends on schemaEvolutionMode: addNewColumns adds the column automatically, rescue (default) puts unknown data in a _rescued_data JSON column, failOnNewColumns stops the stream, and none silently ignores new columns.
Q: What is the _rescued_data column?
A: A special column that captures data that could not be parsed into the expected schema — wrong data types, unexpected columns, malformed rows. Instead of dropping the data, AutoLoader puts the unparseable content as JSON in _rescued_data. This ensures zero data loss at the ingestion layer.
Q: What is the difference between AutoLoader and COPY INTO? A: AutoLoader uses streaming with external checkpoints, supports full schema evolution and rescue, and scales better for large volumes. COPY INTO is a SQL command that tracks files internally in the Delta log, has limited schema evolution, and is simpler for ad-hoc loads. AutoLoader is recommended for production; COPY INTO for quick, SQL-based loads.
Q: What trigger mode should you use for scheduled AutoLoader jobs?
A: trigger(availableNow=True). It processes all pending files across all micro-batches and then stops. This is ideal for scheduled Databricks Workflow jobs that run hourly or daily. Avoid trigger(once=True) which only processes one micro-batch per run.
Wrapping Up
AutoLoader is the single most important feature for file-based data ingestion in Databricks. Six lines of code replace an entire ADF pipeline. Schema inference eliminates manual schema definitions. Schema evolution handles source changes without pipeline modifications. Checkpoints guarantee exactly-once processing. And the rescued data column ensures no row is ever silently lost.
For Bronze layer ingestion in the Medallion Architecture, AutoLoader is the default choice. Schedule a Databricks Workflow that runs a notebook with trigger(availableNow=True), and you have production-grade incremental file ingestion with zero manual tracking, zero schema maintenance, and zero data loss.
Related posts: – Medallion Architecture – Data Quality Framework – Delta Lake Deep Dive – Reading and Writing Every File Format – Databricks Workflows and Jobs – Connecting Databricks to ADLS Gen2
Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.