SCD Type 1 and Type 2 Using PySpark and Delta Lake MERGE in Azure Databricks

SCD Type 1 and Type 2 Using PySpark and Delta Lake MERGE in Azure Databricks

In our earlier posts, we built SCD pipelines using Synapse Data Flows — visual, drag-and-drop, no code. They work. But in Databricks, you do the same thing with PySpark code and Delta Lake’s MERGE command.

The concepts are identical. The hash-based change detection is identical. The result is identical. But the code-first approach gives you more control, easier testing, reusable functions, and Git-friendly notebooks.

This post implements both SCD Type 1 (overwrite changes) and SCD Type 2 (preserve history) using Delta Lake MERGE — step by step, with the same employee data, so you can see exactly how the MERGE syntax differs between the two.

Think of it like cooking the same recipe in two different kitchens. Synapse Data Flows is the kitchen with pre-built appliances — select a transformation, configure it, connect it. PySpark in Databricks is the kitchen where you write the recipe from scratch — more effort upfront, but you control every ingredient and can reuse the recipe anywhere.

Table of Contents

  • SCD Type 1 vs Type 2: Quick Recap
  • The Setup: Same Data for Both
  • Part 1: SCD Type 1 (Overwrite) with Delta MERGE
  • Step 1: Create Initial Data with Hash
  • Step 2: Save as Delta Table
  • Step 3: Create Updated Source Data (v2)
  • Step 4: The MERGE — One Command Does Everything
  • Step 5: Verify with Delta History and Time Travel
  • Step 6: Idempotent Test
  • Step 7: Reusable SCD Type 1 Function
  • Part 2: SCD Type 2 (Preserve History) with Delta MERGE
  • Why SCD Type 2 Needs TWO Operations
  • Step 1: Create Initial Data with Hash and SCD2 Columns
  • Step 2: Save as Delta Table
  • Step 3: Create Updated Source Data (v2)
  • Step 4: Classify Records (New, Changed, Unchanged)
  • Step 5: MERGE to Expire Old Rows
  • Step 6: Append New Versions and New Records
  • Step 7: Verify History
  • Step 8: Third Batch (v3) — Version Numbering
  • Step 9: Reusable SCD Type 2 Function
  • Step 10: Idempotent Test
  • SCD Type 1 vs Type 2: Code Comparison
  • Synapse Data Flow vs PySpark MERGE Mapping
  • Common Mistakes
  • Interview Questions
  • Wrapping Up

SCD Type 1 vs Type 2: Quick Recap

Aspect Type 1 (Overwrite) Type 2 (History)
When data changes Overwrite the old value Expire old row, insert new version
History Lost Preserved
Row count Fixed (1 row per entity) Grows (N rows per entity)
MERGE approach whenMatchedUpdate MERGE to expire + separate INSERT for new version
Use case Email corrections, name fixes Customer moves, price changes, status changes

The Setup: Same Data for Both

We use the same 5 employees for both implementations:

from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

# Initial employee data
data_v1 = [
    (1001, "Naveen", "Vuppula", "naveen@email.com", "Mississauga", "Canada"),
    (1002, "Shrey", "Patil", "shrey@email.com", "Toronto", "Canada"),
    (1003, "Vrushab", "Sharma", "vrushab@email.com", "Vancouver", "Canada"),
    (1004, "Vishnu", "Kumar", "vishnu@email.com", "Ottawa", "Canada"),
    (1005, "Ravi", "Desai", "ravi@email.com", "Calgary", "Canada"),
]

columns = ["customer_id", "first_name", "last_name", "email", "city", "country"]

Part 1: SCD Type 1 (Overwrite) with Delta MERGE

Step 1: Create Initial Data with Hash

# Create schema for SCD Type 1
spark.sql("CREATE SCHEMA IF NOT EXISTS scd_demo")

# Create DataFrame
df_v1 = spark.createDataFrame(data_v1, columns)

# Add hash column (fingerprint of all business columns)
hash_columns = ["first_name", "last_name", "email", "city", "country"]
df_v1 = df_v1.withColumn(
    "hash_value",
    sha2(concat_ws("|", *[col(c) for c in hash_columns]), 256)
).withColumn("last_updated", current_timestamp())

df_v1.show(truncate=False)

Step 2: Save as Delta Table

# Write as Delta — this is Version 0
df_v1.write.format("delta").mode("overwrite").saveAsTable("scd_demo.dim_customer")
print("SCD Type 1 table created: scd_demo.dim_customer (Version 0)")
SELECT * FROM scd_demo.dim_customer ORDER BY customer_id;

5 rows. All original data. Version 0.

Step 3: Create Updated Source Data (v2)

# v2 data — 2 changed, 3 unchanged, 2 new
data_v2 = [
    (1001, "Naveen", "Vuppula", "naveen@email.com", "Toronto", "Canada"),       # CHANGED — city
    (1002, "Shrey", "Patil", "shrey@email.com", "Toronto", "Canada"),           # UNCHANGED
    (1003, "Vrushab", "Sharma", "vrushab.new@email.com", "Vancouver", "Canada"),# CHANGED — email
    (1004, "Vishnu", "Kumar", "vishnu@email.com", "Ottawa", "Canada"),          # UNCHANGED
    (1005, "Ravi", "Desai", "ravi@email.com", "Calgary", "Canada"),             # UNCHANGED
    (1006, "Priya", "Mehta", "priya@email.com", "Mumbai", "India"),             # NEW
    (1007, "Kavya", "Reddy", "kavya@email.com", "Hyderabad", "India"),          # NEW
]

df_v2 = spark.createDataFrame(data_v2, columns)
df_v2 = df_v2.withColumn(
    "hash_value",
    sha2(concat_ws("|", *[col(c) for c in hash_columns]), 256)
).withColumn("last_updated", current_timestamp())

Step 4: The MERGE — One Command Does Everything

This is the beauty of Delta MERGE for SCD Type 1:

# Load the target Delta table
target_table = DeltaTable.forName(spark, "scd_demo.dim_customer")

# MERGE: update changed rows + insert new rows (skip unchanged)
target_table.alias("target").merge(
    df_v2.alias("source"),
    "target.customer_id = source.customer_id"
).whenMatchedUpdate(
    condition="target.hash_value != source.hash_value",  # ONLY update if hash differs
    set={
        "first_name": "source.first_name",
        "last_name": "source.last_name",
        "email": "source.email",
        "city": "source.city",
        "country": "source.country",
        "hash_value": "source.hash_value",
        "last_updated": "source.last_updated"
    }
).whenNotMatchedInsertAll(
).execute()

print("SCD Type 1 MERGE completed!")

What Each Part Does

MERGE Part What It Does Equivalent in Synapse Data Flow
merge(source, condition) Join source with target on business key Lookup transformation
whenMatchedUpdate(condition) Update ONLY if hash differs (changed rows) Conditional Split → ChangedRecords → AlterRow(Update)
whenNotMatchedInsertAll() Insert rows not in target (new records) Conditional Split → NewRecords → AlterRow(Insert)
(Unchanged rows) Automatically skipped — hash matches, condition is false Conditional Split → UnchangedRecords → dropped

One command replaces 6 Synapse Data Flow transformations. Same logic, much less code.

Step 5: Verify with Delta History and Time Travel

-- Current state (Version 1)
SELECT * FROM scd_demo.dim_customer ORDER BY customer_id;
-- 7 rows: Naveen=Toronto, Vrushab=new email, Priya+Kavya added

-- Compare with Version 0 (before MERGE)
SELECT * FROM scd_demo.dim_customer VERSION AS OF 0 ORDER BY customer_id;
-- 5 rows: Naveen=Mississauga, Vrushab=old email

-- Delta history
DESCRIBE HISTORY scd_demo.dim_customer;
-- Version 0: CREATE TABLE, Version 1: MERGE

Key SCD Type 1 behavior: Naveen’s city changed from Mississauga to Toronto. The old value (Mississauga) exists ONLY in Delta time travel (Version 0). The current table shows Toronto. History is overwritten — that is Type 1.

Step 6: Idempotent Test

Run the same MERGE again with the same v2 data:

target_table.alias("target").merge(
    df_v2.alias("source"),
    "target.customer_id = source.customer_id"
).whenMatchedUpdate(
    condition="target.hash_value != source.hash_value",
    set={...}  # same as before
).whenNotMatchedInsertAll().execute()

print("Idempotent test completed!")
DESCRIBE HISTORY scd_demo.dim_customer;
-- New MERGE version created, but 0 rows modified — all hashes match

Idempotent: Running the same data twice produces no changes. The hash comparison ensures unchanged rows are skipped.

Step 7: Reusable SCD Type 1 Function

def scd_type1_merge(source_df, target_table_name, key_column, value_columns):
    '''
    Reusable SCD Type 1 MERGE function.
    - source_df: DataFrame with new/updated data
    - target_table_name: fully qualified Delta table name
    - key_column: business key (e.g., 'customer_id')
    - value_columns: list of columns to track changes
    '''
    # Add hash and timestamp
    df_with_hash = source_df.withColumn(
        "hash_value",
        sha2(concat_ws("|", *[col(c) for c in value_columns]), 256)
    ).withColumn("last_updated", current_timestamp())

    # Build the SET clause dynamically
    update_set = {c: f"source.{c}" for c in value_columns}
    update_set["hash_value"] = "source.hash_value"
    update_set["last_updated"] = "source.last_updated"

    # Execute MERGE
    target = DeltaTable.forName(spark, target_table_name)
    target.alias("target").merge(
        df_with_hash.alias("source"),
        f"target.{key_column} = source.{key_column}"
    ).whenMatchedUpdate(
        condition="target.hash_value != source.hash_value",
        set=update_set
    ).whenNotMatchedInsertAll().execute()

    print(f"SCD Type 1 MERGE completed on {target_table_name}")

# Usage:
scd_type1_merge(df_v3, "scd_demo.dim_customer", "customer_id",
    ["first_name", "last_name", "email", "city", "country"])

Part 2: SCD Type 2 (Preserve History) with Delta MERGE

Why SCD Type 2 Needs TWO Operations

In SCD Type 1, one MERGE handles everything — update changed rows, insert new rows.

In SCD Type 2, a changed record requires TWO actions: 1. Expire the old row (UPDATE: set is_active=false, end_date=yesterday) 2. Insert the new version (INSERT: new row with is_active=true)

Delta MERGE cannot INSERT and UPDATE the same matched row in one operation. So we split it: – Operation 1: MERGE to expire matched+changed rows – Operation 2: APPEND to insert new versions and new records

Real-life analogy: In Type 1, a misspelled name on your passport gets corrected with white-out (overwrite). In Type 2, your old passport gets a “CANCELLED” stamp (expire) and you get a brand new passport (insert new version). Two actions for one change.

Step 1: Create Initial Data with Hash and SCD2 Columns

spark.sql("CREATE SCHEMA IF NOT EXISTS scd2_demo")

df_scd2_v1 = spark.createDataFrame(data_v1, columns)

# Add SCD2 columns
hash_cols = ["first_name", "last_name", "email", "city", "country"]
df_scd2_v1 = df_scd2_v1     .withColumn("hash_value", sha2(concat_ws("|", *[col(c) for c in hash_cols]), 256))     .withColumn("start_date", current_date())     .withColumn("end_date", to_date(lit("9999-12-31")))     .withColumn("is_active", lit(True))     .withColumn("version", lit(1))

df_scd2_v1.show(truncate=False)

Step 2: Save as Delta Table

df_scd2_v1.write.format("delta").mode("overwrite").saveAsTable("scd2_demo.dim_customer")
print("SCD Type 2 table created (Version 0 — 5 customers, all version 1)")

Step 3: Create Updated Source Data (v2)

data_v2_scd2 = [
    (1001, "Naveen", "Vuppula", "naveen@email.com", "Toronto", "Canada"),        # CHANGED — moved
    (1002, "Shrey", "Patil", "shrey@email.com", "Toronto", "Canada"),            # UNCHANGED
    (1003, "Vrushab", "Sharma", "vrushab.new@email.com", "Vancouver", "Canada"), # CHANGED — email
    (1004, "Vishnu", "Kumar", "vishnu@email.com", "Ottawa", "Canada"),           # UNCHANGED
    (1005, "Ravi", "Desai", "ravi@email.com", "Calgary", "Canada"),              # UNCHANGED
    (1006, "Priya", "Mehta", "priya@email.com", "Mumbai", "India"),              # NEW
]

df_scd2_v2 = spark.createDataFrame(data_v2_scd2, columns)
df_scd2_v2 = df_scd2_v2.withColumn(
    "hash_value", sha2(concat_ws("|", *[col(c) for c in hash_cols]), 256)
)

Step 4: Classify Records (New, Changed, Unchanged)

# Read current active records from target
df_target = spark.sql(
    "SELECT customer_id, hash_value as target_hash FROM scd2_demo.dim_customer WHERE is_active = true"
)

# Join source with target to classify
df_classified = df_scd2_v2.alias("s").join(
    df_target.alias("t"),
    col("s.customer_id") == col("t.customer_id"),
    "left"
)

# Classify
df_new = df_classified.filter(col("target_hash").isNull())
df_changed = df_classified.filter(
    col("target_hash").isNotNull() & (col("s.hash_value") != col("t.target_hash"))
)
df_unchanged = df_classified.filter(
    col("target_hash").isNotNull() & (col("s.hash_value") == col("t.target_hash"))
)

print(f"New: {df_new.count()}, Changed: {df_changed.count()}, Unchanged: {df_unchanged.count()}")
# New: 1 (Priya), Changed: 2 (Naveen, Vrushab), Unchanged: 3

Step 5: MERGE to Expire Old Rows

# Expire active rows that have changed
target_table = DeltaTable.forName(spark, "scd2_demo.dim_customer")

target_table.alias("target").merge(
    df_changed.select("s.customer_id", "s.hash_value").alias("source"),
    "target.customer_id = source.customer_id AND target.is_active = true"
).whenMatchedUpdate(
    set={
        "is_active": "false",
        "end_date": "current_date() - INTERVAL 1 DAY"
    }
).execute()

print("Old rows expired!")

This sets is_active=false and end_date=yesterday on Naveen’s Mississauga row and Vrushab’s old email row.

Step 6: Append New Versions and New Records

# Get the max version for changed records
df_max_versions = spark.sql(
    "SELECT customer_id, MAX(version) as max_ver FROM scd2_demo.dim_customer GROUP BY customer_id"
)

# Prepare new versions for changed records
df_new_versions = df_changed.select(
    col("s.customer_id"), col("s.first_name"), col("s.last_name"),
    col("s.email"), col("s.city"), col("s.country"), col("s.hash_value")
).join(
    df_max_versions, "customer_id", "left"
).withColumn("start_date", current_date())  .withColumn("end_date", to_date(lit("9999-12-31")))  .withColumn("is_active", lit(True))  .withColumn("version", col("max_ver") + 1)  .drop("max_ver")

# Prepare new records (first-time inserts)
df_new_inserts = df_new.select(
    col("s.customer_id"), col("s.first_name"), col("s.last_name"),
    col("s.email"), col("s.city"), col("s.country"), col("s.hash_value")
).withColumn("start_date", current_date())  .withColumn("end_date", to_date(lit("9999-12-31")))  .withColumn("is_active", lit(True))  .withColumn("version", lit(1))

# Combine and append
df_to_insert = df_new_versions.unionByName(df_new_inserts)
df_to_insert.write.format("delta").mode("append").saveAsTable("scd2_demo.dim_customer")

print(f"Inserted {df_to_insert.count()} rows (new versions + new records)")

Step 7: Verify History

-- Full table — 8 rows (5 original + 2 expired + 1 new Priya + 2 new versions)
SELECT customer_id, first_name, city, is_active, version, start_date, end_date
FROM scd2_demo.dim_customer
ORDER BY customer_id, version;
customer_id first_name city is_active version start_date end_date
1001 Naveen Mississauga false 1 2026-05-17 2026-05-16
1001 Naveen Toronto true 2 2026-05-17 9999-12-31
1002 Shrey Toronto true 1 2026-05-17 9999-12-31
1003 Vrushab Vancouver false 1 2026-05-17 2026-05-16
1003 Vrushab Vancouver true 2 2026-05-17 9999-12-31
1004 Vishnu Ottawa true 1 2026-05-17 9999-12-31
1005 Ravi Calgary true 1 2026-05-17 9999-12-31
1006 Priya Mumbai true 1 2026-05-17 9999-12-31

Naveen has 2 rows — Mississauga (expired, version 1) and Toronto (active, version 2). Full history preserved.

-- Current state only
SELECT * FROM scd2_demo.dim_customer WHERE is_active = true ORDER BY customer_id;

-- All versions of Naveen
SELECT * FROM scd2_demo.dim_customer WHERE customer_id = 1001 ORDER BY version;

-- Point-in-time: who was active yesterday?
SELECT * FROM scd2_demo.dim_customer
WHERE start_date <= '2026-05-16' AND end_date >= '2026-05-16'
ORDER BY customer_id;

Step 8: Third Batch (v3) — Version Numbering

data_v3_scd2 = [
    (1001, "Naveen", "Vuppula", "naveen.new@email.com", "Toronto", "Canada"),  # CHANGED again — email
    (1003, "Vrushab", "Sharma", "vrushab.new@email.com", "Delhi", "India"),    # CHANGED — moved
    (1007, "Kavya", "Reddy", "kavya@email.com", "Hyderabad", "India"),         # NEW
]

# Run the same classify → expire → insert pattern
# Naveen: version 2 expired → version 3 inserted
# Vrushab: version 2 expired → version 3 inserted
# Kavya: version 1 inserted

After v3, Naveen has 3 rows (versions 1, 2, 3). Vrushab has 3 rows. Complete change history.

Step 9: Reusable SCD Type 2 Function

def scd_type2_merge(source_df, target_table_name, key_column, value_columns):
    '''
    Reusable SCD Type 2 function.
    Classifies records, expires old versions, inserts new versions.
    '''
    # Step A: Add hash to source
    df_source = source_df.withColumn(
        "hash_value", sha2(concat_ws("|", *[col(c) for c in value_columns]), 256)
    )

    # Step B: Read current active records
    df_target = spark.sql(
        f"SELECT {key_column}, hash_value as target_hash "
        f"FROM {target_table_name} WHERE is_active = true"
    )

    # Step C: Classify
    df_joined = df_source.alias("s").join(
        df_target.alias("t"),
        col(f"s.{key_column}") == col(f"t.{key_column}"),
        "left"
    )
    df_new = df_joined.filter(col("target_hash").isNull())
    df_changed = df_joined.filter(
        col("target_hash").isNotNull() & (col("s.hash_value") != col("t.target_hash"))
    )

    changed_count = df_changed.count()
    new_count = df_new.count()

    if changed_count == 0 and new_count == 0:
        print(f"No changes detected. Table unchanged.")
        return

    # Step D: Expire old rows
    if changed_count > 0:
        target = DeltaTable.forName(spark, target_table_name)
        target.alias("target").merge(
            df_changed.select(f"s.{key_column}", "s.hash_value").alias("source"),
            f"target.{key_column} = source.{key_column} AND target.is_active = true"
        ).whenMatchedUpdate(
            set={"is_active": "false", "end_date": "current_date() - INTERVAL 1 DAY"}
        ).execute()

    # Step E: Get max versions
    df_max_ver = spark.sql(
        f"SELECT {key_column}, MAX(version) as max_ver FROM {target_table_name} GROUP BY {key_column}"
    )

    # Step F: Prepare new versions
    source_cols = [f"s.{c}" for c in source_df.columns] + ["s.hash_value"]
    df_new_versions = df_changed.select(*[col(c) for c in source_cols])         .join(df_max_ver, key_column, "left")         .withColumn("start_date", current_date())         .withColumn("end_date", to_date(lit("9999-12-31")))         .withColumn("is_active", lit(True))         .withColumn("version", col("max_ver") + 1)         .drop("max_ver")

    df_new_inserts = df_new.select(*[col(c) for c in source_cols])         .withColumn("start_date", current_date())         .withColumn("end_date", to_date(lit("9999-12-31")))         .withColumn("is_active", lit(True))         .withColumn("version", lit(1))

    # Step G: Insert
    df_to_insert = df_new_versions.unionByName(df_new_inserts)
    df_to_insert.write.format("delta").mode("append").saveAsTable(target_table_name)

    print(f"SCD Type 2 completed: {changed_count} expired+new versions, {new_count} new records")

# Usage:
scd_type2_merge(df_v3, "scd2_demo.dim_customer", "customer_id",
    ["first_name", "last_name", "email", "city", "country"])

Step 10: Idempotent Test

# Run same data again — should produce 0 changes
scd_type2_merge(df_v3, "scd2_demo.dim_customer", "customer_id",
    ["first_name", "last_name", "email", "city", "country"])
# Output: "No changes detected. Table unchanged."

Hash comparison ensures idempotency — running the same data twice does nothing.

SCD Type 1 vs Type 2: Code Comparison

Aspect Type 1 MERGE Type 2 MERGE
Operations 1 MERGE 1 MERGE (expire) + 1 APPEND (insert)
MERGE action whenMatchedUpdate (overwrite) whenMatchedUpdate (set is_active=false)
Insert whenNotMatchedInsertAll Separate df.write.mode("append")
Table columns hash_value, last_updated hash_value, start_date, end_date, is_active, version
Table growth Fixed (1 row per customer) Grows (N rows per customer)
After 3 batches Still 7 rows Could be 10+ rows

Synapse Data Flow vs PySpark MERGE Mapping

Synapse Data Flow PySpark Delta MERGE
Source → Derived Column (hash) df.withColumn("hash_value", sha2(...))
Lookup (left outer join) df.join(target, key, "left")
Conditional Split (New/Changed/Unchanged) df.filter(isNull(...)) / df.filter(hash != hash)
Alter Row (Insert) + Sink whenNotMatchedInsertAll() or df.write.mode("append")
Alter Row (Update) + Sink whenMatchedUpdate(condition, set)
Unchanged → dropped Automatic — hash matches, condition is false
3 Sinks (Insert, Expire, Type1Update) 1 MERGE + 1 APPEND

Same logic. Different syntax. Same result.

Common Mistakes

  1. Forgetting AND target.is_active = true in SCD Type 2 MERGE condition — without it, the MERGE matches ALL rows for a customer (including already-expired ones), potentially re-expiring them.

  2. Using whenNotMatchedInsertAll() for SCD Type 2 new versions — this only inserts rows NOT in the target. Changed records ARE in the target (they matched). New versions must be inserted via a separate APPEND.

  3. Not including hash in the MERGE condition — without target.hash_value != source.hash_value, every matched row gets updated, even unchanged ones. The hash condition makes it idempotent.

  4. Forgetting to add SCD2 columns (start_date, end_date, is_active, version) to new version inserts — the APPEND fails or inserts incomplete rows.

  5. Running Type 2 expire and insert in the wrong order — always EXPIRE first (MERGE), then INSERT new versions. If you insert first, the new version gets immediately expired by the MERGE.

  6. Not handling version numbering — without MAX(version) + 1, all versions are numbered 1, making it impossible to order history correctly.

Interview Questions

Q: How do you implement SCD Type 1 using Delta Lake MERGE? A: One MERGE statement with two clauses: whenMatchedUpdate(condition="hash differs") updates only changed rows (overwriting old values), and whenNotMatchedInsertAll() inserts new records. Unchanged rows are automatically skipped because the hash condition is false. The hash comparison makes it idempotent.

Q: Why does SCD Type 2 need two operations instead of one MERGE? A: Because a changed record requires both an UPDATE (expire the old row) and an INSERT (new version). Delta MERGE cannot insert AND update for the same matched row. So we use MERGE to expire old rows, then a separate DataFrame append to insert new versions and new records.

Q: How do you ensure SCD merges are idempotent? A: By using hash-based change detection. The source hash is compared against the target hash. If they match, the row is skipped. Running the same data twice produces zero changes because all hashes match on the second run.

Q: How does the PySpark MERGE map to Synapse Data Flow transformations? A: The Derived Column (hash) maps to withColumn(sha2(...)). The Lookup maps to a DataFrame join. The Conditional Split maps to filter conditions. Alter Row + Sink maps to whenMatchedUpdate and whenNotMatchedInsertAll or append. Same logic, code vs visual.

Q: How do you handle version numbering in SCD Type 2? A: Query MAX(version) from the target table grouped by the business key. The new version number is max_version + 1. For brand new records (not in target), version starts at 1.

Wrapping Up

SCD Type 1 and Type 2 in PySpark use the same concepts we built in Synapse Data Flows — hash-based change detection, conditional routing, and targeted insert/update operations. The difference is syntax: Data Flows use visual transformations, PySpark uses code.

The code-first approach gives you reusable functions, easier testing, Git-friendly notebooks, and the full power of Delta Lake (time travel, OPTIMIZE, VACUUM). Once you have the scd_type1_merge() and scd_type2_merge() functions, implementing SCD on any table is a one-liner.

Related posts:SCD Types Explained (0,1,2,3,6)SCD Type 1 Hash-Based (Synapse Data Flows)SCD Type 2 Pipeline (Synapse Data Flows)Delta Lake Deep DivePySpark Transformations Cookbook


Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link