Streaming with Databricks: Structured Streaming, Kafka Integration, Delta Live Tables, Trigger Modes, Watermarks, and Production Streaming Pipelines

Streaming with Databricks: Structured Streaming, Kafka Integration, Delta Live Tables, Trigger Modes, Watermarks, and Production Streaming Pipelines

Our AutoLoader post covered streaming from FILES — new files land, AutoLoader picks them up. But what about streaming from MESSAGE SYSTEMS like Kafka, Event Hubs, or Kinesis? What about continuous, always-on streams where data arrives every second?

This post goes beyond AutoLoader into the full world of Spark Structured Streaming in Databricks — reading from Kafka, processing with watermarks, writing to Delta in near real-time, and building production streaming pipelines with Delta Live Tables.

Think of batch processing like a postal service — letters accumulate in a mailbox and are delivered once a day. Streaming is like a messaging app — each message arrives and is processed the moment it is sent. Micro-batch streaming (the Spark default) is in between — messages accumulate for a few seconds, then are processed as a small batch. You get near-real-time results without the complexity of true event-at-a-time processing.

Table of Contents

  • Batch vs Streaming vs Micro-Batch
  • Structured Streaming Fundamentals
  • readStream and writeStream
  • Trigger Modes
  • Output Modes (Append, Complete, Update)
  • Checkpointing
  • Streaming from Kafka
  • Reading from Kafka
  • Parsing Kafka Messages
  • Writing Processed Data to Delta
  • Streaming from Event Hubs
  • Watermarks and Late Data
  • What Are Watermarks?
  • Windowed Aggregations with Watermarks
  • Delta Live Tables (DLT)
  • What Are Delta Live Tables?
  • DLT Pipeline Example
  • DLT Expectations (Data Quality)
  • Streaming to Delta Lake Best Practices
  • Monitoring Streaming Queries
  • AutoLoader vs Kafka vs Event Hubs
  • Common Mistakes
  • Interview Questions
  • Wrapping Up

Batch vs Streaming vs Micro-Batch

Processing Model Latency How It Works Use Case
Batch Hours Process all data at once (scheduled) Daily ETL, historical analysis
Micro-batch Seconds to minutes Process small batches continuously Near-real-time dashboards, IoT
True streaming Milliseconds Process each event individually Fraud detection, stock trading

Spark Structured Streaming uses micro-batch by default — it accumulates events for a configurable interval (e.g., 10 seconds), processes them as a batch, then repeats. This gives near-real-time results with the simplicity and fault tolerance of batch processing.

Structured Streaming Fundamentals

readStream and writeStream

# Batch: read all data at once
df = spark.read.format("delta").load("/data/orders")

# Streaming: read data continuously as it arrives
stream_df = spark.readStream.format("delta").load("/data/orders")

# The stream_df looks like a regular DataFrame — same API!
# filter, join, groupBy, withColumn — all work the same
filtered = stream_df.filter(col("amount") > 100)

# Write the stream to a sink (Delta table)
query = (filtered.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoints/orders_filtered")
    .trigger(processingTime="10 seconds")
    .toTable("silver.filtered_orders")
)

# The query runs continuously until stopped
query.awaitTermination()  # Block until query stops
# Or: query.stop()        # Stop programmatically

Trigger Modes

Trigger Behavior Use Case
processingTime="10 seconds" Process micro-batch every 10 seconds Near-real-time dashboards
availableNow=True Process all available data, then stop Scheduled incremental loads (replaces trigger once)
once=True Process one micro-batch, then stop (deprecated) Legacy — use availableNow instead
No trigger (default) Process as fast as possible (continuous micro-batches) Maximum throughput, minimum latency

# Near-real-time: process every 30 seconds
.trigger(processingTime="30 seconds")

# Scheduled batch-style: process everything available, then stop
# Perfect for running in a Databricks Job every hour
.trigger(availableNow=True)

# Maximum speed: process as fast as possible
# (no trigger specified — default behavior)

Real-life analogy: processingTime is like a bus that departs every 10 minutes — passengers accumulate at the stop and the bus takes everyone waiting. availableNow is like a shuttle that runs once, picks up everyone at the stop, drops them off, and parks. Default (no trigger) is like a taxi — leaves the moment a passenger arrives.

Output Modes (Append, Complete, Update)

Mode What Gets Written Use With
Append Only new rows since last trigger Filters, maps (no aggregations)
Complete Entire result table (overwrite) Aggregations (groupBy + count/sum)
Update Only changed rows since last trigger Aggregations with watermarks

Checkpointing

# Checkpoint stores the stream's progress — which data has been processed
# If the stream crashes and restarts, it resumes from the checkpoint
# NEVER delete or change the checkpoint location for a running stream

.option("checkpointLocation", "/checkpoints/my_stream")

# One checkpoint per stream — do NOT share checkpoints between streams
# Store checkpoints in a reliable location (ADLS, S3, DBFS)
# Checkpoint = exactly-once processing guarantee

Real-life analogy: The checkpoint is like a bookmark in a book. If you stop reading (stream stops), the bookmark tells you exactly where to resume. Without it, you would have to start from the beginning every time.

Streaming from Kafka

Reading from Kafka

# Read from Kafka topic
kafka_df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "orders_topic")        # topic name
    .option("startingOffsets", "latest")         # or "earliest" for full history
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.sasl.jaas.config",
            'org.apache.kafka.common.security.plain.PlainLoginModule required '
            'username="apikey" password="your-api-key";')
    .load()
)

# Kafka DataFrame has fixed schema:
# key (binary), value (binary), topic, partition, offset, timestamp
# Your actual data is in the "value" column as binary (usually JSON)

Parsing Kafka Messages

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define the schema of your JSON messages
order_schema = StructType([
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("product", StringType()),
    StructField("amount", DoubleType()),
    StructField("order_time", TimestampType())
])

# Parse the binary value column as JSON
parsed_df = (kafka_df
    .selectExpr("CAST(value AS STRING) as json_str")
    .select(from_json(col("json_str"), order_schema).alias("data"))
    .select("data.*")
)

# Result: a clean DataFrame with order_id, customer_id, product, amount, order_time

Writing Processed Data to Delta

# Write parsed Kafka data to Bronze Delta table
query = (parsed_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoints/kafka_orders_bronze")
    .trigger(processingTime="30 seconds")
    .toTable("bronze.kafka_orders")
)

# Complete pipeline: Kafka → Bronze (raw) → Silver (cleaned) → Gold (aggregated)
# Each arrow is a separate streaming query with its own checkpoint

Streaming from Event Hubs

# Azure Event Hubs uses the Kafka protocol
# Connection string with Kafka compatibility

EH_CONN = "Endpoint=sb://myhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxx"
EH_TOPIC = "orders"

kafka_conf = {
    "kafka.bootstrap.servers": "myhub.servicebus.windows.net:9093",
    "subscribe": EH_TOPIC,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{EH_CONN}";',
    "startingOffsets": "latest"
}

eh_df = spark.readStream.format("kafka").options(**kafka_conf).load()
# Same parsing as Kafka — Event Hubs exposes the Kafka protocol

Watermarks and Late Data

What Are Watermarks?

In streaming, data can arrive late — an order placed at 3:00 PM might reach Kafka at 3:15 PM due to network delays. Without watermarks, Spark must keep ALL state forever (waiting for potentially late data), consuming unlimited memory.

A watermark tells Spark: “data arriving more than X minutes late can be dropped.” This bounds the state and prevents memory from growing infinitely.

# Allow data up to 10 minutes late
watermarked_df = parsed_df.withWatermark("order_time", "10 minutes")

# Any order with order_time more than 10 minutes behind the current
# maximum order_time will be dropped (not included in aggregations)

Real-life analogy: A watermark is like a restaurant’s last-order policy. “Kitchen closes at 10 PM. Orders placed after 10 PM are not accepted.” If a waiter brings an order slip timestamped 9:45 PM at 10:05 PM (10 minutes late), it is still accepted. But an order from 9:30 PM arriving at 10:31 PM (over the threshold) is rejected.

Windowed Aggregations with Watermarks

from pyspark.sql.functions import window

# Count orders per 5-minute window, allowing 10 minutes late data
windowed_counts = (parsed_df
    .withWatermark("order_time", "10 minutes")
    .groupBy(
        window("order_time", "5 minutes"),   # 5-minute tumbling window
        "product"
    )
    .count()
)

# Write aggregated results
query = (windowed_counts.writeStream
    .format("delta")
    .outputMode("update")       # Only write changed windows
    .option("checkpointLocation", "/checkpoints/order_counts")
    .trigger(processingTime="1 minute")
    .toTable("gold.order_counts_5min")
)

Delta Live Tables (DLT)

What Are Delta Live Tables?

Delta Live Tables (DLT) is a declarative framework for building streaming and batch ETL pipelines. Instead of writing readStream/writeStream code, you declare WHAT each table should contain and DLT handles the HOW — orchestration, error handling, monitoring, and data quality checks.

DLT Pipeline Example

import dlt
from pyspark.sql.functions import col, current_timestamp

# Bronze: ingest raw data (streaming or batch)
@dlt.table(comment="Raw orders from Kafka")
def bronze_orders():
    return (spark.readStream.format("kafka")
        .option("subscribe", "orders_topic")
        .option("kafka.bootstrap.servers", "broker:9092")
        .load()
        .selectExpr("CAST(value AS STRING) as raw_json", "timestamp as kafka_timestamp")
    )

# Silver: clean and parse
@dlt.table(comment="Parsed and validated orders")
@dlt.expect_or_drop("valid_amount", "amount > 0")
@dlt.expect_or_drop("valid_customer", "customer_id IS NOT NULL")
def silver_orders():
    return (dlt.read_stream("bronze_orders")
        .select(from_json(col("raw_json"), order_schema).alias("data"))
        .select("data.*")
        .withColumn("processed_at", current_timestamp())
    )

# Gold: aggregate
@dlt.table(comment="Hourly order summary")
def gold_hourly_summary():
    return (dlt.read_stream("silver_orders")
        .withWatermark("order_time", "1 hour")
        .groupBy(window("order_time", "1 hour"), "product")
        .agg(count("order_id").alias("order_count"),
             sum("amount").alias("total_revenue"))
    )

DLT Expectations (Data Quality)

# Three levels of data quality enforcement:

@dlt.expect("valid_amount", "amount > 0")
# WARN: log invalid rows but keep them (monitoring only)

@dlt.expect_or_drop("valid_amount", "amount > 0")
# DROP: silently remove rows that fail the check

@dlt.expect_or_fail("valid_amount", "amount > 0")
# FAIL: stop the entire pipeline if any row fails (strictest)

Streaming to Delta Lake Best Practices

  1. Always set a checkpoint location — without it, the stream has no memory and reprocesses everything on restart
  2. Use trigger(availableNow=True) for scheduled jobs — processes all available data and stops cleanly, perfect for hourly Databricks Jobs
  3. Set watermarks for aggregations — without watermarks, state grows unbounded and eventually causes OOM
  4. Use foreachBatch for complex writes — when you need to write to multiple sinks or perform MERGE operations per micro-batch
  5. Monitor with query.lastProgress — check input rows per second, processing time, and state size
  6. Separate Bronze streaming from Silver/Gold — Bronze ingests as fast as possible, Silver/Gold transform on their own schedule

Monitoring Streaming Queries

# Check streaming query status
print(query.status)        # isDataAvailable, isTriggerActive
print(query.lastProgress)  # detailed metrics for the last micro-batch

# Key metrics to monitor:
# inputRowsPerSecond:  how fast data is arriving
# processedRowsPerSecond: how fast Spark processes it
# If input > processed: your stream is falling behind (scale up!)

# List all active streams
for q in spark.streams.active:
    print(f"{q.name}: {q.status}")

AutoLoader vs Kafka vs Event Hubs

Source Data Arrives As Latency Best For
AutoLoader Files in cloud storage (ADLS, S3) Minutes File-based ingestion, SFTP drops, batch exports
Kafka Messages on topics Seconds Real-time events, microservices, IoT, clickstream
Event Hubs Events via Azure service Seconds Azure-native streaming, IoT Hub integration

Common Mistakes

  1. No checkpoint location — stream reprocesses everything on restart, causing duplicates
  2. Sharing checkpoints between streams — each streaming query MUST have its own unique checkpoint. Sharing causes data loss or corruption
  3. Aggregation without watermark — state grows unbounded, eventually OOM. Always add withWatermark() before groupBy() in streaming
  4. Using trigger(once=True) — deprecated. Use trigger(availableNow=True) which processes ALL available data, not just one micro-batch
  5. Not monitoring input vs processing rate — if input rate exceeds processing rate, the stream falls behind. Monitor lastProgress and scale the cluster

Interview Questions

Q: What is the difference between batch processing and Structured Streaming? A: Batch reads a fixed dataset and processes it once. Structured Streaming treats the input as an unbounded table — new data continuously appends and is processed incrementally. The API is identical (same DataFrame operations), but streaming adds concepts like triggers (when to process), checkpoints (fault tolerance), and watermarks (handling late data).

Q: What is a watermark in Spark Streaming and why is it needed? A: A watermark defines the maximum allowed lateness for event-time data. Without watermarks, Spark must keep all state forever (for potentially late-arriving data), consuming unlimited memory. With a watermark of “10 minutes,” Spark drops any data arriving more than 10 minutes late, allowing it to clean up old state and bound memory usage.

Q: What is the difference between trigger(availableNow) and trigger(processingTime)? A: trigger(availableNow=True) processes ALL available data then stops — ideal for scheduled Databricks Jobs (run every hour, process everything, stop). trigger(processingTime="10 seconds") runs continuously, processing a micro-batch every 10 seconds — ideal for always-on near-real-time streams.

Wrapping Up

Streaming in Databricks is not a separate technology — it is the same DataFrame API you already know, with readStream instead of read and writeStream instead of write. Add checkpoints for fault tolerance, watermarks for late data handling, and trigger modes for controlling processing frequency. Delta Live Tables simplify the orchestration for production streaming pipelines with built-in data quality checks.

Related posts:AutoLoader (cloudFiles)Spark Structured Streaming in FabricDelta Lake Deep Dive


Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link