Streaming with Databricks: Structured Streaming, Kafka Integration, Delta Live Tables, Trigger Modes, Watermarks, and Production Streaming Pipelines
Our AutoLoader post covered streaming from FILES — new files land, AutoLoader picks them up. But what about streaming from MESSAGE SYSTEMS like Kafka, Event Hubs, or Kinesis? What about continuous, always-on streams where data arrives every second?
This post goes beyond AutoLoader into the full world of Spark Structured Streaming in Databricks — reading from Kafka, processing with watermarks, writing to Delta in near real-time, and building production streaming pipelines with Delta Live Tables.
Think of batch processing like a postal service — letters accumulate in a mailbox and are delivered once a day. Streaming is like a messaging app — each message arrives and is processed the moment it is sent. Micro-batch streaming (the Spark default) is in between — messages accumulate for a few seconds, then are processed as a small batch. You get near-real-time results without the complexity of true event-at-a-time processing.
Table of Contents
- Batch vs Streaming vs Micro-Batch
- Structured Streaming Fundamentals
- readStream and writeStream
- Trigger Modes
- Output Modes (Append, Complete, Update)
- Checkpointing
- Streaming from Kafka
- Reading from Kafka
- Parsing Kafka Messages
- Writing Processed Data to Delta
- Streaming from Event Hubs
- Watermarks and Late Data
- What Are Watermarks?
- Windowed Aggregations with Watermarks
- Delta Live Tables (DLT)
- What Are Delta Live Tables?
- DLT Pipeline Example
- DLT Expectations (Data Quality)
- Streaming to Delta Lake Best Practices
- Monitoring Streaming Queries
- AutoLoader vs Kafka vs Event Hubs
- Common Mistakes
- Interview Questions
- Wrapping Up
Batch vs Streaming vs Micro-Batch
| Processing Model | Latency | How It Works | Use Case |
|---|---|---|---|
| Batch | Hours | Process all data at once (scheduled) | Daily ETL, historical analysis |
| Micro-batch | Seconds to minutes | Process small batches continuously | Near-real-time dashboards, IoT |
| True streaming | Milliseconds | Process each event individually | Fraud detection, stock trading |
Spark Structured Streaming uses micro-batch by default — it accumulates events for a configurable interval (e.g., 10 seconds), processes them as a batch, then repeats. This gives near-real-time results with the simplicity and fault tolerance of batch processing.
Structured Streaming Fundamentals
readStream and writeStream
# Batch: read all data at once
df = spark.read.format("delta").load("/data/orders")
# Streaming: read data continuously as it arrives
stream_df = spark.readStream.format("delta").load("/data/orders")
# The stream_df looks like a regular DataFrame — same API!
# filter, join, groupBy, withColumn — all work the same
filtered = stream_df.filter(col("amount") > 100)
# Write the stream to a sink (Delta table)
query = (filtered.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/orders_filtered")
.trigger(processingTime="10 seconds")
.toTable("silver.filtered_orders")
)
# The query runs continuously until stopped
query.awaitTermination() # Block until query stops
# Or: query.stop() # Stop programmatically
Trigger Modes
| Trigger | Behavior | Use Case |
|---|---|---|
processingTime="10 seconds" |
Process micro-batch every 10 seconds | Near-real-time dashboards |
availableNow=True |
Process all available data, then stop | Scheduled incremental loads (replaces trigger once) |
once=True |
Process one micro-batch, then stop (deprecated) | Legacy — use availableNow instead |
| No trigger (default) | Process as fast as possible (continuous micro-batches) | Maximum throughput, minimum latency |
# Near-real-time: process every 30 seconds
.trigger(processingTime="30 seconds")
# Scheduled batch-style: process everything available, then stop
# Perfect for running in a Databricks Job every hour
.trigger(availableNow=True)
# Maximum speed: process as fast as possible
# (no trigger specified — default behavior)
Real-life analogy: processingTime is like a bus that departs every 10 minutes — passengers accumulate at the stop and the bus takes everyone waiting. availableNow is like a shuttle that runs once, picks up everyone at the stop, drops them off, and parks. Default (no trigger) is like a taxi — leaves the moment a passenger arrives.
Output Modes (Append, Complete, Update)
| Mode | What Gets Written | Use With |
|---|---|---|
| Append | Only new rows since last trigger | Filters, maps (no aggregations) |
| Complete | Entire result table (overwrite) | Aggregations (groupBy + count/sum) |
| Update | Only changed rows since last trigger | Aggregations with watermarks |
Checkpointing
# Checkpoint stores the stream's progress — which data has been processed
# If the stream crashes and restarts, it resumes from the checkpoint
# NEVER delete or change the checkpoint location for a running stream
.option("checkpointLocation", "/checkpoints/my_stream")
# One checkpoint per stream — do NOT share checkpoints between streams
# Store checkpoints in a reliable location (ADLS, S3, DBFS)
# Checkpoint = exactly-once processing guarantee
Real-life analogy: The checkpoint is like a bookmark in a book. If you stop reading (stream stops), the bookmark tells you exactly where to resume. Without it, you would have to start from the beginning every time.
Streaming from Kafka
Reading from Kafka
# Read from Kafka topic
kafka_df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "orders_topic") # topic name
.option("startingOffsets", "latest") # or "earliest" for full history
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
'org.apache.kafka.common.security.plain.PlainLoginModule required '
'username="apikey" password="your-api-key";')
.load()
)
# Kafka DataFrame has fixed schema:
# key (binary), value (binary), topic, partition, offset, timestamp
# Your actual data is in the "value" column as binary (usually JSON)
Parsing Kafka Messages
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define the schema of your JSON messages
order_schema = StructType([
StructField("order_id", StringType()),
StructField("customer_id", StringType()),
StructField("product", StringType()),
StructField("amount", DoubleType()),
StructField("order_time", TimestampType())
])
# Parse the binary value column as JSON
parsed_df = (kafka_df
.selectExpr("CAST(value AS STRING) as json_str")
.select(from_json(col("json_str"), order_schema).alias("data"))
.select("data.*")
)
# Result: a clean DataFrame with order_id, customer_id, product, amount, order_time
Writing Processed Data to Delta
# Write parsed Kafka data to Bronze Delta table
query = (parsed_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/kafka_orders_bronze")
.trigger(processingTime="30 seconds")
.toTable("bronze.kafka_orders")
)
# Complete pipeline: Kafka → Bronze (raw) → Silver (cleaned) → Gold (aggregated)
# Each arrow is a separate streaming query with its own checkpoint
Streaming from Event Hubs
# Azure Event Hubs uses the Kafka protocol
# Connection string with Kafka compatibility
EH_CONN = "Endpoint=sb://myhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxx"
EH_TOPIC = "orders"
kafka_conf = {
"kafka.bootstrap.servers": "myhub.servicebus.windows.net:9093",
"subscribe": EH_TOPIC,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{EH_CONN}";',
"startingOffsets": "latest"
}
eh_df = spark.readStream.format("kafka").options(**kafka_conf).load()
# Same parsing as Kafka — Event Hubs exposes the Kafka protocol
Watermarks and Late Data
What Are Watermarks?
In streaming, data can arrive late — an order placed at 3:00 PM might reach Kafka at 3:15 PM due to network delays. Without watermarks, Spark must keep ALL state forever (waiting for potentially late data), consuming unlimited memory.
A watermark tells Spark: “data arriving more than X minutes late can be dropped.” This bounds the state and prevents memory from growing infinitely.
# Allow data up to 10 minutes late
watermarked_df = parsed_df.withWatermark("order_time", "10 minutes")
# Any order with order_time more than 10 minutes behind the current
# maximum order_time will be dropped (not included in aggregations)
Real-life analogy: A watermark is like a restaurant’s last-order policy. “Kitchen closes at 10 PM. Orders placed after 10 PM are not accepted.” If a waiter brings an order slip timestamped 9:45 PM at 10:05 PM (10 minutes late), it is still accepted. But an order from 9:30 PM arriving at 10:31 PM (over the threshold) is rejected.
Windowed Aggregations with Watermarks
from pyspark.sql.functions import window
# Count orders per 5-minute window, allowing 10 minutes late data
windowed_counts = (parsed_df
.withWatermark("order_time", "10 minutes")
.groupBy(
window("order_time", "5 minutes"), # 5-minute tumbling window
"product"
)
.count()
)
# Write aggregated results
query = (windowed_counts.writeStream
.format("delta")
.outputMode("update") # Only write changed windows
.option("checkpointLocation", "/checkpoints/order_counts")
.trigger(processingTime="1 minute")
.toTable("gold.order_counts_5min")
)
Delta Live Tables (DLT)
What Are Delta Live Tables?
Delta Live Tables (DLT) is a declarative framework for building streaming and batch ETL pipelines. Instead of writing readStream/writeStream code, you declare WHAT each table should contain and DLT handles the HOW — orchestration, error handling, monitoring, and data quality checks.
DLT Pipeline Example
import dlt
from pyspark.sql.functions import col, current_timestamp
# Bronze: ingest raw data (streaming or batch)
@dlt.table(comment="Raw orders from Kafka")
def bronze_orders():
return (spark.readStream.format("kafka")
.option("subscribe", "orders_topic")
.option("kafka.bootstrap.servers", "broker:9092")
.load()
.selectExpr("CAST(value AS STRING) as raw_json", "timestamp as kafka_timestamp")
)
# Silver: clean and parse
@dlt.table(comment="Parsed and validated orders")
@dlt.expect_or_drop("valid_amount", "amount > 0")
@dlt.expect_or_drop("valid_customer", "customer_id IS NOT NULL")
def silver_orders():
return (dlt.read_stream("bronze_orders")
.select(from_json(col("raw_json"), order_schema).alias("data"))
.select("data.*")
.withColumn("processed_at", current_timestamp())
)
# Gold: aggregate
@dlt.table(comment="Hourly order summary")
def gold_hourly_summary():
return (dlt.read_stream("silver_orders")
.withWatermark("order_time", "1 hour")
.groupBy(window("order_time", "1 hour"), "product")
.agg(count("order_id").alias("order_count"),
sum("amount").alias("total_revenue"))
)
DLT Expectations (Data Quality)
# Three levels of data quality enforcement:
@dlt.expect("valid_amount", "amount > 0")
# WARN: log invalid rows but keep them (monitoring only)
@dlt.expect_or_drop("valid_amount", "amount > 0")
# DROP: silently remove rows that fail the check
@dlt.expect_or_fail("valid_amount", "amount > 0")
# FAIL: stop the entire pipeline if any row fails (strictest)
Streaming to Delta Lake Best Practices
- Always set a checkpoint location — without it, the stream has no memory and reprocesses everything on restart
- Use
trigger(availableNow=True)for scheduled jobs — processes all available data and stops cleanly, perfect for hourly Databricks Jobs - Set watermarks for aggregations — without watermarks, state grows unbounded and eventually causes OOM
- Use
foreachBatchfor complex writes — when you need to write to multiple sinks or perform MERGE operations per micro-batch - Monitor with
query.lastProgress— check input rows per second, processing time, and state size - Separate Bronze streaming from Silver/Gold — Bronze ingests as fast as possible, Silver/Gold transform on their own schedule
Monitoring Streaming Queries
# Check streaming query status
print(query.status) # isDataAvailable, isTriggerActive
print(query.lastProgress) # detailed metrics for the last micro-batch
# Key metrics to monitor:
# inputRowsPerSecond: how fast data is arriving
# processedRowsPerSecond: how fast Spark processes it
# If input > processed: your stream is falling behind (scale up!)
# List all active streams
for q in spark.streams.active:
print(f"{q.name}: {q.status}")
AutoLoader vs Kafka vs Event Hubs
| Source | Data Arrives As | Latency | Best For |
|---|---|---|---|
| AutoLoader | Files in cloud storage (ADLS, S3) | Minutes | File-based ingestion, SFTP drops, batch exports |
| Kafka | Messages on topics | Seconds | Real-time events, microservices, IoT, clickstream |
| Event Hubs | Events via Azure service | Seconds | Azure-native streaming, IoT Hub integration |
Common Mistakes
- No checkpoint location — stream reprocesses everything on restart, causing duplicates
- Sharing checkpoints between streams — each streaming query MUST have its own unique checkpoint. Sharing causes data loss or corruption
- Aggregation without watermark — state grows unbounded, eventually OOM. Always add
withWatermark()beforegroupBy()in streaming - Using
trigger(once=True)— deprecated. Usetrigger(availableNow=True)which processes ALL available data, not just one micro-batch - Not monitoring input vs processing rate — if input rate exceeds processing rate, the stream falls behind. Monitor
lastProgressand scale the cluster
Interview Questions
Q: What is the difference between batch processing and Structured Streaming? A: Batch reads a fixed dataset and processes it once. Structured Streaming treats the input as an unbounded table — new data continuously appends and is processed incrementally. The API is identical (same DataFrame operations), but streaming adds concepts like triggers (when to process), checkpoints (fault tolerance), and watermarks (handling late data).
Q: What is a watermark in Spark Streaming and why is it needed? A: A watermark defines the maximum allowed lateness for event-time data. Without watermarks, Spark must keep all state forever (for potentially late-arriving data), consuming unlimited memory. With a watermark of “10 minutes,” Spark drops any data arriving more than 10 minutes late, allowing it to clean up old state and bound memory usage.
Q: What is the difference between trigger(availableNow) and trigger(processingTime)?
A: trigger(availableNow=True) processes ALL available data then stops — ideal for scheduled Databricks Jobs (run every hour, process everything, stop). trigger(processingTime="10 seconds") runs continuously, processing a micro-batch every 10 seconds — ideal for always-on near-real-time streams.
Wrapping Up
Streaming in Databricks is not a separate technology — it is the same DataFrame API you already know, with readStream instead of read and writeStream instead of write. Add checkpoints for fault tolerance, watermarks for late data handling, and trigger modes for controlling processing frequency. Delta Live Tables simplify the orchestration for production streaming pipelines with built-in data quality checks.
Related posts: – AutoLoader (cloudFiles) – Spark Structured Streaming in Fabric – Delta Lake Deep Dive
Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.