Databricks: DLT SCD2 & SCD1 table | Apply Changes | CDC | Back-loading SCD2 | Delete/Truncate SCD


Introduction

Goal: Build a CDC-ready dimension pipeline in Delta Live Tables (DLT) that supports:

  • SCD Type 1 (overwrite in place)
  • SCD Type 2 (history with start/end validity)
  • Deleting/truncating target tables from source signals
  • Back-loading “out-of-order” historical changes into SCD2

Core ideas you’ll use

  • CDC (Change Data Capture): Detecting and applying inserts/updates/deletes from a source stream to one or more targets.
  • SCD Type 1: Latest values overwrite previous values (no history).
  • SCD Type 2: Track full history by closing old versions and inserting new “current” versions with validity windows.
  • DLT apply_changes: A declarative API in DLT that handles SCD1/SCD2 mechanics for you, including ordering, column exclusions, and optional delete/truncate semantics.

What we’ll model

  • A Customer dimension fed by a “raw” Delta table/stream.
  • The source includes two CDC helper columns:
    • SRC_ACTION — one of 'I' (insert/upsert), 'D' (delete), 'T' (truncate).
    • SRC_INSERT_DATE — event timestamp used for change ordering.

How to build SCD1 or SCD2 tables in DLT Pipelines?

You’ll create:

  1. A streaming view of your source (DLT needs streaming input for apply_changes).
  2. An SCD1 table with dlt.apply_changes(..., stored_as='SCD Type 1').
  3. An SCD2 table with dlt.apply_changes(..., stored_as='SCD Type 2').

✅ DLT Edition: set your pipeline Product edition to Pro (or Advanced) to use apply_changes.


Slowly Changing Dimension Type 1 table (SCD1)

  • Definition: Keep only the current values. Every change on a key overwrites existing values.
  • When to use: Attributes that aren’t analyzed historically (e.g., phone number, email, latest address, operational flags).
  • Pros/Cons: Simple & space-efficient, but no history.

Example DLT Notebook (Python)

Put this in a DLT notebook file (e.g., 01_dlt_scd_cdc.py). Configure the DLT pipeline to run this notebook.

import dlt
from pyspark.sql import functions as F
from pyspark.sql.functions import expr

# --------------------------
# CONFIG (optional helpers)
# --------------------------
CATALOG = "dev"         # your catalog
SCHEMA  = "etl"         # your schema
SRC_TBL = f"{CATALOG}.{SCHEMA}.customer_raw"  # the raw Delta table (upstream ingestion)

# The raw table must include:
# - business key: C_CUSTKEY
# - SRC_ACTION: 'I' (upsert), 'D' (delete), 'T' (truncate)
# - SRC_INSERT_DATE: event timestamp used for sequencing


# --------------------------
# STREAMING SOURCE VIEW
# --------------------------
@dlt.view(
    comment="Streaming view over the raw customer table."
)
def customer_raw_view():
    # DLT apply_changes requires a streaming source (readStream.table).
    # If your raw table is a Delta table that is appended over time, Spark can stream from it.
    return (spark.readStream.table(SRC_TBL))


# --------------------------
# SCD1 (UPSERT) TARGET
# --------------------------
# Create an empty streaming target table first (DLT will manage storage).
dlt.create_streaming_table(
    name="customer_scd1_bronze",
    comment="SCD Type 1 (no history) Customer dimension built by apply_changes."
)

# Declarative CDC: SCD Type 1
dlt.apply_changes(
    target = "customer_scd1_bronze",
    source = "customer_raw_view",         # the streaming view
    keys   = ["C_CUSTKEY"],               # business key(s)
    stored_as = "SCD Type 1",             # explicit; default is also SCD Type 1
    sequence_by = expr("SRC_INSERT_DATE"),# ordering column (timestamp)
    apply_as_deletes   = expr("SRC_ACTION = 'D'"),  # delete rows that match key(s)
    apply_as_truncates = expr("SRC_ACTION = 'T'")   # truncate entire table
)

What this does

  • New keys are inserted.
  • Existing keys are overwritten (latest wins).
  • If any input row arrives with SRC_ACTION = 'D' → delete that key from target.
  • If any input row arrives with SRC_ACTION = 'T' → truncate target.

How to delete and truncate data from target table in DLT?

  • Delete a single key: emit a record with that C_CUSTKEY, set SRC_ACTION = 'D' (other attributes can be null), and SRC_INSERT_DATE = current_timestamp().
  • Truncate all: emit a record with SRC_ACTION = 'T' (other fields don’t matter). Use carefully.

This works for SCD1; for SCD2 you often keep history and skip deletes/truncates unless you have a specific downstream contract.


Slowly Changing Dimension Type 2 table (SCD2)

  • Definition: Preserve history. Each change creates a new version; the previous version gets an end timestamp.
  • When to use: Analytical attributes where historical truth matters (e.g., customer segment, region, status, marital status, VIP tier).
  • Pros/Cons: Full audit/history; larger storage & more complex joins.

Example DLT (add SCD2 target in the same notebook)

# --------------------------
# SCD2 (HISTORY) TARGET
# --------------------------
dlt.create_streaming_table(
    name="customer_scd2_bronze",
    comment="SCD Type 2 (historical) Customer dimension built by apply_changes."
)

# For SCD2, you typically DO NOT delete or truncate.
# You can also exclude operational columns (e.g., SRC_ACTION, SRC_INSERT_DATE)
# so they don't participate in change comparison.
dlt.apply_changes(
    target = "customer_scd2_bronze",
    source = "customer_raw_view",
    keys   = ["C_CUSTKEY"],
    stored_as    = "SCD Type 2",
    sequence_by  = expr("SRC_INSERT_DATE"),
    except_column_list = ["SRC_ACTION", "SRC_INSERT_DATE"]
    # Note: Omitting apply_as_deletes/apply_as_truncates retains full history
)

# --------------------------
# DOWNSTREAM: USING ONLY CURRENT ROWS FROM SCD2
# --------------------------
@dlt.view(
    comment="Active (current) SCD2 rows only, for downstream joins."
)
def customer_scd2_current():
    # DLT SCD2 creates validity window columns: START_AT / END_AT.
    # Active rows have END_AT IS NULL.
    return spark.read.table(dlt.fqn("customer_scd2_bronze")).where(F.col("END_AT").isNull())

What DLT manages for you

  • Adds/maintains START_AT and END_AT columns.
  • On change, it “closes” the previous row by filling END_AT and inserts a new “current” row with END_AT = NULL.
  • Respects ordering by sequence_by (the most important part for history correctness).

Typical downstream pattern

  • Always join fact tables to current dimension rows (END_AT IS NULL).
  • For point-in-time analytics, join on “as of” a date within [START_AT, END_AT).

Back-loading/filling data in SCD2 tables in DLT

Problem: You ingest a late (older) change that should slot between two existing SCD2 versions (out-of-order arrival).

How DLT solves it

  • Because you declare sequence_by = SRC_INSERT_DATE, DLT reshapes the SCD2 timeline correctly:
    • Inserts the “late” version with its proper START_AT.
    • Adjusts END_AT of the prior version.
    • Ensures the latest version remains END_AT IS NULL.

What you do

  • Emit that late change with the actual historical timestamp in SRC_INSERT_DATE.
  • DLT will re-assemble the timeline for that key. No manual MERGE gymnastics.

Use-case: Late arriving CRM updates, backfills from upstream MDM, or replays from log systems.


How to delete data in SCD table in DLT?

  • SCD1: We already enabled apply_as_deletes = expr("SRC_ACTION = 'D'"). Send the key + action 'D' → it deletes the row.
  • SCD2 (common practice): do not hard delete; preserve history. If you must represent deletion:
    • Option A (preferred): Add a business flag column (e.g., IS_ACTIVE=false) and let that change create a final SCD2 row. Downstream uses that flag or END_AT.
    • Option B (if you truly need hard deletes): Add apply_as_deletes to the SCD2 definition—but be aware you’re throwing away history for those keys.
# (Not recommended unless required)
# dlt.apply_changes(
#    ...,
#    stored_as="SCD Type 2",
#    apply_as_deletes=expr("SRC_ACTION = 'D'")
# )

How to Truncate SCD table in DLT?

  • SCD1: apply_as_truncates = expr("SRC_ACTION = 'T'") already wired. Emit 'T' once → target fully truncated.
  • SCD2: Generally avoid truncation in history tables. If your contract demands it (e.g., nightly rebuild of a small dimension), you could add apply_as_truncates—but you’ll lose history.

Full Notebook (Put together)

This notebook assumes:

  • Unity Catalog: dev.etl exists
  • A Delta source table dev.etl.customer_raw is being appended with CDC style rows: C_CUSTKEY, attributes, SRC_ACTION, SRC_INSERT_DATE.
  • Your DLT pipeline Product Edition = Pro (or Advanced)
# Databricks Delta Live Tables (DLT) — SCD1 & SCD2 with CDC
import dlt
from pyspark.sql import functions as F
from pyspark.sql.functions import expr

CATALOG = "dev"
SCHEMA  = "etl"
SRC_TBL = f"{CATALOG}.{SCHEMA}.customer_raw"

# 1) STREAMING SOURCE VIEW
@dlt.view(
    comment="Streaming view over raw customer data with CDC signals."
)
def customer_raw_view():
    return spark.readStream.table(SRC_TBL)

# 2) SCD1 TABLE (UPSERTS + optional deletes/truncate)
dlt.create_streaming_table(
    name="customer_scd1_bronze",
    comment="SCD Type 1 (overwrite) Customer dimension."
)
dlt.apply_changes(
    target   = "customer_scd1_bronze",
    source   = "customer_raw_view",
    keys     = ["C_CUSTKEY"],
    stored_as   = "SCD Type 1",
    sequence_by = expr("SRC_INSERT_DATE"),
    apply_as_deletes   = expr("SRC_ACTION = 'D'"),
    apply_as_truncates = expr("SRC_ACTION = 'T'")
)

# 3) SCD2 TABLE (HISTORY), exclude operational columns from compare
dlt.create_streaming_table(
    name="customer_scd2_bronze",
    comment="SCD Type 2 (history) Customer dimension."
)
dlt.apply_changes(
    target   = "customer_scd2_bronze",
    source   = "customer_raw_view",
    keys     = ["C_CUSTKEY"],
    stored_as   = "SCD Type 2",
    sequence_by = expr("SRC_INSERT_DATE"),
    except_column_list = ["SRC_ACTION", "SRC_INSERT_DATE"]
    # Intentionally no deletes/truncates to preserve history
)

# 4) CURRENT SNAPSHOT OF SCD2 (handy for joins)
@dlt.view(
    comment="Current (active) SCD2 snapshot; END_AT is null."
)
def customer_scd2_current():
    return spark.read.table(dlt.fqn("customer_scd2_bronze")).where(F.col("END_AT").isNull())

Practical Use-Cases (cheat sheet)

  • SCD1 – Operational attributes
    • Email, phone, marketing consent, preferred store—only latest matters.
    • Deletion: remove one key (GDPR delete), truncation: hard reset (sandbox/POC).
  • SCD2 – Analytical attributes
    • Customer segment, membership tier, region, status, relationship manager, plan type.
    • Back-loading: legal name change backdated to its legal effective date; late MDM corrections.
    • Current snapshot view for day-to-day joins; full table for time-travel analytics.
  • CDC patterns
    • Real-time (streaming ingest) or micro-batch: both work since DLT abstracts the mechanics.
    • Deletes/truncates controlled at source via SRC_ACTION so you can audit intent.

Tips & Gotchas

  • Always set a trustworthy sequence_by column (event time). If you use ingestion time instead, late events won’t line up historically.
  • Backfills: just write the true historical timestamp into SRC_INSERT_DATE; DLT reflows SCD2 for you.
  • Column exclusions: use except_column_list to keep “operational/control” columns (like SRC_ACTION, SRC_INSERT_DATE, file names, etc.) out of change detection.
  • Debugging: run in Development mode to iterate quickly; switch to Production to auto-terminate clusters after updates.
  • Deletes in SCD2: prefer a soft-delete attribute that creates a final historical row rather than hard delete (history loss).

If you want, I can turn this into a “copy-paste” DLT tutorial notebook with small helper cells that seed demo data and show sample queries (SCD1 vs SCD2, deletes, truncates, and back-loading).