Spark Structured Streaming in Fabric: Stateless vs Stateful Transformations, Checkpoints, Output Modes, Windowing, and Processing Real-Time Data with Delta Lake

Spark Structured Streaming in Fabric: Stateless vs Stateful Transformations, Checkpoints, Output Modes, Windowing, and Processing Real-Time Data with Delta Lake

Batch processing handles data that has already landed — files in a folder, rows in a table. But what about data that NEVER stops arriving? IoT sensors sending readings every second, clickstream events flowing continuously, transaction logs growing in real-time. You cannot wait for the data to “finish” because it never finishes.

Spark Structured Streaming processes this continuous data using the same DataFrame API you already know from batch PySpark. Write your transformation once, and Spark runs it continuously — processing new data as it arrives, maintaining state across batches, and writing results to Delta tables.

Think of batch processing like doing laundry once a week — collect everything, wash it all at once. Structured Streaming is like a washing machine that starts automatically whenever you drop clothes in the basket. Same machine (Spark), same operations (wash/transform), different trigger (continuous vs scheduled).

Table of Contents

  • What Is Structured Streaming?
  • Streaming vs Batch: Same API, Different Trigger
  • Setting Up a Streaming Source
  • Reading from Event Hubs / Kafka
  • Reading from Delta Lake (Auto Loader Pattern)
  • Reading from Files (File Streaming)
  • Output Modes: Append, Complete, Update
  • Checkpoint Location: Crash Recovery
  • Stateless vs Stateful Transformations
  • Stateless: Filter, Select, Map
  • Stateful: Aggregation, Deduplication, Joins
  • Windowing Functions for Streaming
  • Tumbling Windows
  • Sliding (Hopping) Windows
  • Session Windows
  • Watermarks: Handling Late-Arriving Data
  • Writing Stream Output to Delta Tables
  • Streaming to Lakehouse Tables
  • Merge Stream into Delta (Foreachbatch)
  • Trigger Modes
  • Processing Time Trigger
  • Once Trigger (Incremental Batch)
  • Available Now Trigger
  • Monitoring Streaming Queries
  • Real-World Scenario 1: IoT Sensor to Delta Lake
  • Real-World Scenario 2: Clickstream Deduplication
  • Real-World Scenario 3: Real-Time Aggregation Dashboard
  • Common Mistakes
  • Interview Questions
  • Wrapping Up

What Is Structured Streaming?

Structured Streaming treats a stream as an unbounded table that grows as new data arrives. Every micro-batch of new data is processed as a small DataFrame — using the exact same API as batch.

Batch:
  df = spark.read.format("delta").load("path")    # Read ALL data at once
  df.filter(...).groupBy(...).write.save("output") # Process once, done

Streaming:
  df = spark.readStream.format("delta").load("path") # Read NEW data continuously
  df.filter(...).groupBy(...).writeStream.start()     # Process forever, incrementally

Streaming vs Batch: Same API, Different Trigger

# BATCH: Process all data once
df_batch = spark.read.format("delta").load("Tables/raw_events")
result = df_batch.filter(col("event_type") == "purchase").groupBy("product").count()
result.write.format("delta").mode("overwrite").save("Tables/purchase_counts")

# STREAMING: Process new data continuously (almost identical!)
df_stream = spark.readStream.format("delta").load("Tables/raw_events")
result = df_stream.filter(col("event_type") == "purchase").groupBy("product").count()
result.writeStream     .format("delta")     .outputMode("complete")     .option("checkpointLocation", "Files/checkpoints/purchase_counts")     .start("Tables/purchase_counts")

The only changes: readreadStream, writewriteStream, add outputMode and checkpointLocation.

Setting Up a Streaming Source

Reading from Event Hubs / Kafka

# Read from Azure Event Hubs (Kafka-compatible)
connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=...;EntityPath=events"
eh_conf = {
    "eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connection_string),
    "eventhubs.consumerGroup": "$Default",
    "eventhubs.startingPosition": '{"offset":"-1","seqNo":-1,"enqueuedTime":null,"isInclusive":true}'
}

df_stream = spark.readStream     .format("eventhubs")     .options(**eh_conf)     .load()

# Event Hubs returns binary body — parse it
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

schema = StructType()     .add("device_id", StringType())     .add("temperature", DoubleType())     .add("timestamp", TimestampType())

df_parsed = df_stream     .select(from_json(col("body").cast("string"), schema).alias("data"))     .select("data.*")

Reading from Delta Lake (Auto Loader Pattern)

# Stream NEW rows added to a Delta table (incremental processing)
df_stream = spark.readStream     .format("delta")     .load("Tables/bronze_orders")
# Only processes rows added SINCE the last checkpoint — not the entire table

Reading from Files (File Streaming)

# Stream new CSV/JSON/Parquet files as they land in a folder
df_stream = spark.readStream     .format("csv")     .option("header", "true")     .schema(my_schema)     .load("Files/incoming_csv/")
# Processes each new file as it appears in the folder

Output Modes: Append, Complete, Update

Mode What It Writes Use When
Append Only NEW rows (never updates previous output) Simple filters, maps — no aggregation
Complete Entire result table (rewrites everything each micro-batch) Aggregations (GROUP BY) — full result needed
Update Only CHANGED rows (updates in-place) Aggregations where you only want the delta

# APPEND mode: each micro-batch adds new rows
query = df_filtered.writeStream     .format("delta")     .outputMode("append")     .option("checkpointLocation", "Files/checkpoints/filtered")     .start("Tables/filtered_events")

# COMPLETE mode: rewrites entire aggregation each time
query = df_aggregated.writeStream     .format("delta")     .outputMode("complete")     .option("checkpointLocation", "Files/checkpoints/agg")     .start("Tables/event_counts")

# UPDATE mode: only writes changed aggregation rows
query = df_aggregated.writeStream     .format("delta")     .outputMode("update")     .option("checkpointLocation", "Files/checkpoints/agg_update")     .start("Tables/event_counts_updated")

Real-life analogy: Append is like a diary — you only add new entries, never rewrite old ones. Complete is like a whiteboard scoreboard — erase and rewrite the entire board every time scores change. Update is like a spreadsheet — only change the cells that actually changed.

Checkpoint Location: Crash Recovery

A checkpoint stores the streaming query’s progress. If the notebook crashes and restarts, streaming resumes from the last checkpoint — no data loss, no duplicates.

.option("checkpointLocation", "Files/checkpoints/my_stream")
Checkpoint contains:
  - Which data has been processed (offsets)
  - State of aggregations (intermediate results)
  - Configuration of the query

Crash scenario:
  1. Stream processed events 1-1000 → checkpoint saved
  2. Notebook crashes at event 1001
  3. Notebook restarts → reads checkpoint → resumes from event 1001
  4. No events lost, no duplicates

CRITICAL: Every streaming query MUST have a unique checkpoint location. Two queries sharing a checkpoint corrupt each other.

Stateless vs Stateful Transformations

Stateless: Filter, Select, Map

Stateless transformations process each row independently — no memory of previous rows needed:

# Stateless: each row processed independently
df_filtered = df_stream.filter(col("temperature") > 100)
df_projected = df_stream.select("device_id", "temperature", "timestamp")
df_mapped = df_stream.withColumn("temp_fahrenheit", col("temperature") * 9/5 + 32)

# Output mode: APPEND (just add new processed rows)

Stateful: Aggregation, Deduplication, Joins

Stateful transformations maintain state across micro-batches — they need to “remember” previous data:

# Stateful: needs to remember running counts
df_counts = df_stream.groupBy("device_id").count()
# Spark maintains a state store with the count per device_id
# Each new micro-batch updates the counts

# Stateful: deduplication (remember which IDs we have seen)
df_deduped = df_stream.dropDuplicates(["event_id"])
# Spark remembers all seen event_ids to avoid duplicates

# Stateful: stream-stream join
df_joined = df_orders.join(df_payments, "order_id")
# Spark buffers unmatched rows until their match arrives

# Output mode: COMPLETE or UPDATE (not APPEND for aggregations)

Real-life analogy: Stateless is like a cashier scanning items — each item is independent, no memory needed. Stateful is like a running tab at a bar — the bartender must REMEMBER your previous drinks to give you the correct total. State = memory.

Windowing Functions for Streaming

Tumbling Windows

Non-overlapping, fixed-size windows. Each event belongs to exactly ONE window:

from pyspark.sql.functions import window

# Tumbling window: 5-minute buckets, no overlap
df_windowed = df_stream     .groupBy(
        window(col("timestamp"), "5 minutes"),  # Tumbling window
        col("device_id")
    )     .agg(avg("temperature").alias("avg_temp"))

# Result:
# Window: 10:00-10:05 | device_001 | avg_temp: 72.5
# Window: 10:05-10:10 | device_001 | avg_temp: 73.1
# Window: 10:10-10:15 | device_001 | avg_temp: 71.8
Timeline:
|--10:00--10:05--|--10:05--10:10--|--10:10--10:15--|
|   Window 1     |   Window 2     |   Window 3     |
| events here    | events here    | events here    |
  No overlap — each event in exactly one window

Sliding (Hopping) Windows

Overlapping windows. An event can belong to MULTIPLE windows:

# Sliding window: 10-minute window, sliding every 5 minutes
df_windowed = df_stream     .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),  # window size, slide interval
        col("device_id")
    )     .agg(avg("temperature").alias("avg_temp"))

# Result:
# Window: 10:00-10:10 | device_001 | avg_temp: 72.8
# Window: 10:05-10:15 | device_001 | avg_temp: 73.2  ← overlaps with above!
# Window: 10:10-10:20 | device_001 | avg_temp: 71.9
Timeline:
|------10:00------10:10------|
       |------10:05------10:15------|
              |------10:10------10:20------|
  Windows OVERLAP — an event at 10:07 appears in windows 1 AND 2

Session Windows

Dynamic windows based on activity gaps. Window closes when no events arrive for the gap duration:

# Session window: closes after 10 minutes of inactivity
df_sessions = df_stream     .groupBy(
        session_window(col("timestamp"), "10 minutes"),  # gap duration
        col("user_id")
    )     .agg(count("*").alias("events_in_session"))

# Result:
# user_001: Session 10:00-10:23 (active for 23 min) | 45 events
# user_001: Session 10:40-10:52 (active for 12 min) | 20 events
# (10-minute gap between 10:23 and 10:40 closed the first session)

Watermarks: Handling Late-Arriving Data

Events can arrive OUT OF ORDER (event happened at 10:00, but arrives at 10:07). Watermarks tell Spark how long to WAIT for late data before closing a window:

# Allow events up to 10 minutes late
df_watermarked = df_stream     .withWatermark("timestamp", "10 minutes")     .groupBy(
        window(col("timestamp"), "5 minutes"),
        col("device_id")
    )     .agg(avg("temperature").alias("avg_temp"))

# At 10:15, Spark considers the 10:00-10:05 window "closed"
# (watermark = 10:15 - 10 minutes = 10:05, so 10:00-10:05 window is finalized)
# Events arriving for 10:00-10:05 AFTER 10:15 are DROPPED (too late)

Writing Stream Output to Delta Tables

Streaming to Lakehouse Tables

# Write stream directly to a Lakehouse Delta table
query = df_processed.writeStream     .format("delta")     .outputMode("append")     .option("checkpointLocation", "Files/checkpoints/processed_events")     .toTable("silver.processed_events")  # Creates/appends to managed table

Merge Stream into Delta (foreachBatch)

For upsert (MERGE) with streaming, use foreachBatch:

from delta.tables import DeltaTable

def upsert_to_delta(micro_batch_df, batch_id):
    '''Merge each micro-batch into the target Delta table.'''
    if micro_batch_df.isEmpty():
        return

    target = DeltaTable.forName(spark, "silver.device_status")

    target.alias("t").merge(
        micro_batch_df.alias("s"),
        "t.device_id = s.device_id"
    ).whenMatchedUpdate(set={
        "temperature": "s.temperature",
        "last_reading": "s.timestamp",
        "updated_at": "current_timestamp()"
    }).whenNotMatchedInsertAll().execute()

# Use foreachBatch to apply the MERGE function to each micro-batch
query = df_stream.writeStream     .foreachBatch(upsert_to_delta)     .option("checkpointLocation", "Files/checkpoints/device_upsert")     .start()

Trigger Modes

Trigger Behavior Use Case
Default (micro-batch) Processes continuously, new batch starts immediately after previous finishes True real-time processing
processingTime(“30 seconds”) Processes every 30 seconds Near real-time with controlled frequency
once=True / availableNow=True Processes all available data, then STOPS Scheduled incremental batch (run from pipeline)

# Continuous (default)
query = df.writeStream.format("delta").start()

# Every 30 seconds
query = df.writeStream.format("delta")     .trigger(processingTime="30 seconds").start()

# Process once and stop (great for pipelines!)
query = df.writeStream.format("delta")     .trigger(availableNow=True).start()
query.awaitTermination()  # Waits until done, then notebook continues

availableNow is the most useful for data engineers — it processes all NEW data since the last checkpoint, then stops. Schedule it in a pipeline like a batch job, but with streaming’s exactly-once guarantees.

Real-World Scenario 1: IoT Sensor to Delta Lake

# Read from Event Hubs → clean → write to silver Delta table
df_raw = spark.readStream.format("eventhubs").options(**eh_conf).load()

df_parsed = df_raw.select(
    from_json(col("body").cast("string"), sensor_schema).alias("data")
).select("data.*")

df_clean = df_parsed     .filter(col("temperature").isNotNull())     .withWatermark("timestamp", "5 minutes")     .dropDuplicates(["device_id", "timestamp"])

query = df_clean.writeStream     .format("delta")     .outputMode("append")     .option("checkpointLocation", "Files/checkpoints/iot_silver")     .toTable("silver.sensor_readings")

Real-World Scenario 2: Clickstream Deduplication

# Deduplicate clickstream events (same user, same page, within 1 second)
df_clicks = spark.readStream.format("delta").load("Tables/bronze_clicks")

df_deduped = df_clicks     .withWatermark("event_time", "10 minutes")     .dropDuplicates(["user_id", "page_url", "event_time"])

query = df_deduped.writeStream     .format("delta")     .outputMode("append")     .option("checkpointLocation", "Files/checkpoints/clicks_dedup")     .toTable("silver.unique_clicks")

Real-World Scenario 3: Real-Time Aggregation

# 5-minute tumbling window aggregation of sales
df_sales = spark.readStream.format("delta").load("Tables/bronze_transactions")

df_windowed = df_sales     .withWatermark("transaction_time", "10 minutes")     .groupBy(
        window(col("transaction_time"), "5 minutes"),
        col("store_id")
    )     .agg(
        sum("amount").alias("total_revenue"),
        count("*").alias("transaction_count"),
        avg("amount").alias("avg_transaction")
    )

query = df_windowed.writeStream     .format("delta")     .outputMode("update")     .option("checkpointLocation", "Files/checkpoints/sales_agg")     .toTable("gold.realtime_sales_summary")

Common Mistakes

  1. Forgetting the checkpoint location — without it, a restart reprocesses ALL data from the beginning. Always set a unique checkpoint per query.

  2. Using Append mode with aggregations — aggregations update existing groups, so Append mode fails. Use Complete or Update for GROUP BY queries.

  3. Not setting watermarks for stateful operations — without watermarks, Spark keeps ALL state forever, eventually running out of memory. Always set watermarks for aggregations, deduplication, and joins.

  4. Sharing checkpoint locations between queries — each streaming query MUST have its own checkpoint directory. Sharing corrupts state.

  5. Not using availableNow for pipeline-scheduled streaming — running continuous streaming in a pipeline wastes resources. Use trigger(availableNow=True) to process all new data and stop.

Interview Questions

Q: What is the difference between Append, Complete, and Update output modes? A: Append adds only new rows (for stateless operations). Complete rewrites the entire result each micro-batch (for aggregations needing the full table). Update writes only changed rows (for aggregations where only deltas matter). Append cannot be used with aggregations because group results change over time.

Q: What is a watermark in Spark Structured Streaming? A: A watermark defines how long Spark waits for late-arriving events before finalizing a window. withWatermark("timestamp", "10 minutes") means events arriving more than 10 minutes late are dropped. Without watermarks, Spark keeps all state indefinitely, causing memory issues.

Q: What is the difference between tumbling and sliding windows? A: Tumbling windows are non-overlapping and fixed-size — each event belongs to exactly one window (e.g., 5-minute buckets). Sliding windows overlap — defined by window size AND slide interval (e.g., 10-minute windows sliding every 5 minutes), so an event can appear in multiple windows.

Q: What is foreachBatch and when do you use it? A: foreachBatch applies a custom function to each micro-batch DataFrame. Use it when writeStream’s built-in modes are insufficient — for example, MERGE/upsert into a Delta table, writing to multiple destinations, or calling external APIs per batch.

Wrapping Up

Spark Structured Streaming brings the power of batch PySpark to continuous data. Same API, same transformations, same Delta Lake — just readStream instead of read. Combined with checkpoints for crash recovery, watermarks for late data, and availableNow for pipeline-scheduled processing, it handles everything from real-time IoT to incremental batch loading.

Related posts:Real-Time IntelligencePySpark TransformationsDelta Lake Deep DiveApache Spark in Fabric


Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link