Introduction
Goal: Build a CDC-ready dimension pipeline in Delta Live Tables (DLT) that supports:
- SCD Type 1 (overwrite in place)
- SCD Type 2 (history with start/end validity)
- Deleting/truncating target tables from source signals
- Back-loading “out-of-order” historical changes into SCD2
Core ideas you’ll use
- CDC (Change Data Capture): Detecting and applying inserts/updates/deletes from a source stream to one or more targets.
- SCD Type 1: Latest values overwrite previous values (no history).
- SCD Type 2: Track full history by closing old versions and inserting new “current” versions with validity windows.
- DLT
apply_changes
: A declarative API in DLT that handles SCD1/SCD2 mechanics for you, including ordering, column exclusions, and optional delete/truncate semantics.
What we’ll model
- A Customer dimension fed by a “raw” Delta table/stream.
- The source includes two CDC helper columns:
SRC_ACTION
— one of'I'
(insert/upsert),'D'
(delete),'T'
(truncate).SRC_INSERT_DATE
— event timestamp used for change ordering.
How to build SCD1 or SCD2 tables in DLT Pipelines?
You’ll create:
- A streaming view of your source (DLT needs streaming input for
apply_changes
). - An SCD1 table with
dlt.apply_changes(..., stored_as='SCD Type 1')
. - An SCD2 table with
dlt.apply_changes(..., stored_as='SCD Type 2')
.
✅ DLT Edition: set your pipeline Product edition to Pro (or Advanced) to use
apply_changes
.
Slowly Changing Dimension Type 1 table (SCD1)
- Definition: Keep only the current values. Every change on a key overwrites existing values.
- When to use: Attributes that aren’t analyzed historically (e.g., phone number, email, latest address, operational flags).
- Pros/Cons: Simple & space-efficient, but no history.
Example DLT Notebook (Python)
Put this in a DLT notebook file (e.g.,
01_dlt_scd_cdc.py
). Configure the DLT pipeline to run this notebook.
import dlt
from pyspark.sql import functions as F
from pyspark.sql.functions import expr
# --------------------------
# CONFIG (optional helpers)
# --------------------------
CATALOG = "dev" # your catalog
SCHEMA = "etl" # your schema
SRC_TBL = f"{CATALOG}.{SCHEMA}.customer_raw" # the raw Delta table (upstream ingestion)
# The raw table must include:
# - business key: C_CUSTKEY
# - SRC_ACTION: 'I' (upsert), 'D' (delete), 'T' (truncate)
# - SRC_INSERT_DATE: event timestamp used for sequencing
# --------------------------
# STREAMING SOURCE VIEW
# --------------------------
@dlt.view(
comment="Streaming view over the raw customer table."
)
def customer_raw_view():
# DLT apply_changes requires a streaming source (readStream.table).
# If your raw table is a Delta table that is appended over time, Spark can stream from it.
return (spark.readStream.table(SRC_TBL))
# --------------------------
# SCD1 (UPSERT) TARGET
# --------------------------
# Create an empty streaming target table first (DLT will manage storage).
dlt.create_streaming_table(
name="customer_scd1_bronze",
comment="SCD Type 1 (no history) Customer dimension built by apply_changes."
)
# Declarative CDC: SCD Type 1
dlt.apply_changes(
target = "customer_scd1_bronze",
source = "customer_raw_view", # the streaming view
keys = ["C_CUSTKEY"], # business key(s)
stored_as = "SCD Type 1", # explicit; default is also SCD Type 1
sequence_by = expr("SRC_INSERT_DATE"),# ordering column (timestamp)
apply_as_deletes = expr("SRC_ACTION = 'D'"), # delete rows that match key(s)
apply_as_truncates = expr("SRC_ACTION = 'T'") # truncate entire table
)
What this does
- New keys are inserted.
- Existing keys are overwritten (latest wins).
- If any input row arrives with
SRC_ACTION = 'D'
→ delete that key from target. - If any input row arrives with
SRC_ACTION = 'T'
→ truncate target.
How to delete and truncate data from target table in DLT?
- Delete a single key: emit a record with that
C_CUSTKEY
, setSRC_ACTION = 'D'
(other attributes can be null), andSRC_INSERT_DATE = current_timestamp()
. - Truncate all: emit a record with
SRC_ACTION = 'T'
(other fields don’t matter). Use carefully.
This works for SCD1; for SCD2 you often keep history and skip deletes/truncates unless you have a specific downstream contract.
Slowly Changing Dimension Type 2 table (SCD2)
- Definition: Preserve history. Each change creates a new version; the previous version gets an end timestamp.
- When to use: Analytical attributes where historical truth matters (e.g., customer segment, region, status, marital status, VIP tier).
- Pros/Cons: Full audit/history; larger storage & more complex joins.
Example DLT (add SCD2 target in the same notebook)
# --------------------------
# SCD2 (HISTORY) TARGET
# --------------------------
dlt.create_streaming_table(
name="customer_scd2_bronze",
comment="SCD Type 2 (historical) Customer dimension built by apply_changes."
)
# For SCD2, you typically DO NOT delete or truncate.
# You can also exclude operational columns (e.g., SRC_ACTION, SRC_INSERT_DATE)
# so they don't participate in change comparison.
dlt.apply_changes(
target = "customer_scd2_bronze",
source = "customer_raw_view",
keys = ["C_CUSTKEY"],
stored_as = "SCD Type 2",
sequence_by = expr("SRC_INSERT_DATE"),
except_column_list = ["SRC_ACTION", "SRC_INSERT_DATE"]
# Note: Omitting apply_as_deletes/apply_as_truncates retains full history
)
# --------------------------
# DOWNSTREAM: USING ONLY CURRENT ROWS FROM SCD2
# --------------------------
@dlt.view(
comment="Active (current) SCD2 rows only, for downstream joins."
)
def customer_scd2_current():
# DLT SCD2 creates validity window columns: START_AT / END_AT.
# Active rows have END_AT IS NULL.
return spark.read.table(dlt.fqn("customer_scd2_bronze")).where(F.col("END_AT").isNull())
What DLT manages for you
- Adds/maintains
START_AT
andEND_AT
columns. - On change, it “closes” the previous row by filling
END_AT
and inserts a new “current” row withEND_AT = NULL
. - Respects ordering by
sequence_by
(the most important part for history correctness).
Typical downstream pattern
- Always join fact tables to current dimension rows (
END_AT IS NULL
). - For point-in-time analytics, join on “as of” a date within
[START_AT, END_AT)
.
Back-loading/filling data in SCD2 tables in DLT
Problem: You ingest a late (older) change that should slot between two existing SCD2 versions (out-of-order arrival).
How DLT solves it
- Because you declare
sequence_by = SRC_INSERT_DATE
, DLT reshapes the SCD2 timeline correctly:- Inserts the “late” version with its proper
START_AT
. - Adjusts
END_AT
of the prior version. - Ensures the latest version remains
END_AT IS NULL
.
- Inserts the “late” version with its proper
What you do
- Emit that late change with the actual historical timestamp in
SRC_INSERT_DATE
. - DLT will re-assemble the timeline for that key. No manual MERGE gymnastics.
Use-case: Late arriving CRM updates, backfills from upstream MDM, or replays from log systems.
How to delete data in SCD table in DLT?
- SCD1: We already enabled
apply_as_deletes = expr("SRC_ACTION = 'D'")
. Send the key + action'D'
→ it deletes the row. - SCD2 (common practice): do not hard delete; preserve history. If you must represent deletion:
- Option A (preferred): Add a business flag column (e.g.,
IS_ACTIVE=false
) and let that change create a final SCD2 row. Downstream uses that flag orEND_AT
. - Option B (if you truly need hard deletes): Add
apply_as_deletes
to the SCD2 definition—but be aware you’re throwing away history for those keys.
- Option A (preferred): Add a business flag column (e.g.,
# (Not recommended unless required)
# dlt.apply_changes(
# ...,
# stored_as="SCD Type 2",
# apply_as_deletes=expr("SRC_ACTION = 'D'")
# )
How to Truncate SCD table in DLT?
- SCD1:
apply_as_truncates = expr("SRC_ACTION = 'T'")
already wired. Emit'T'
once → target fully truncated. - SCD2: Generally avoid truncation in history tables. If your contract demands it (e.g., nightly rebuild of a small dimension), you could add
apply_as_truncates
—but you’ll lose history.
Full Notebook (Put together)
This notebook assumes:
- Unity Catalog:
dev.etl
exists- A Delta source table
dev.etl.customer_raw
is being appended with CDC style rows:C_CUSTKEY
, attributes,SRC_ACTION
,SRC_INSERT_DATE
.- Your DLT pipeline Product Edition = Pro (or Advanced)
# Databricks Delta Live Tables (DLT) — SCD1 & SCD2 with CDC
import dlt
from pyspark.sql import functions as F
from pyspark.sql.functions import expr
CATALOG = "dev"
SCHEMA = "etl"
SRC_TBL = f"{CATALOG}.{SCHEMA}.customer_raw"
# 1) STREAMING SOURCE VIEW
@dlt.view(
comment="Streaming view over raw customer data with CDC signals."
)
def customer_raw_view():
return spark.readStream.table(SRC_TBL)
# 2) SCD1 TABLE (UPSERTS + optional deletes/truncate)
dlt.create_streaming_table(
name="customer_scd1_bronze",
comment="SCD Type 1 (overwrite) Customer dimension."
)
dlt.apply_changes(
target = "customer_scd1_bronze",
source = "customer_raw_view",
keys = ["C_CUSTKEY"],
stored_as = "SCD Type 1",
sequence_by = expr("SRC_INSERT_DATE"),
apply_as_deletes = expr("SRC_ACTION = 'D'"),
apply_as_truncates = expr("SRC_ACTION = 'T'")
)
# 3) SCD2 TABLE (HISTORY), exclude operational columns from compare
dlt.create_streaming_table(
name="customer_scd2_bronze",
comment="SCD Type 2 (history) Customer dimension."
)
dlt.apply_changes(
target = "customer_scd2_bronze",
source = "customer_raw_view",
keys = ["C_CUSTKEY"],
stored_as = "SCD Type 2",
sequence_by = expr("SRC_INSERT_DATE"),
except_column_list = ["SRC_ACTION", "SRC_INSERT_DATE"]
# Intentionally no deletes/truncates to preserve history
)
# 4) CURRENT SNAPSHOT OF SCD2 (handy for joins)
@dlt.view(
comment="Current (active) SCD2 snapshot; END_AT is null."
)
def customer_scd2_current():
return spark.read.table(dlt.fqn("customer_scd2_bronze")).where(F.col("END_AT").isNull())
Practical Use-Cases (cheat sheet)
- SCD1 – Operational attributes
- Email, phone, marketing consent, preferred store—only latest matters.
- Deletion: remove one key (GDPR delete), truncation: hard reset (sandbox/POC).
- SCD2 – Analytical attributes
- Customer segment, membership tier, region, status, relationship manager, plan type.
- Back-loading: legal name change backdated to its legal effective date; late MDM corrections.
- Current snapshot view for day-to-day joins; full table for time-travel analytics.
- CDC patterns
- Real-time (streaming ingest) or micro-batch: both work since DLT abstracts the mechanics.
- Deletes/truncates controlled at source via
SRC_ACTION
so you can audit intent.
Tips & Gotchas
- Always set a trustworthy
sequence_by
column (event time). If you use ingestion time instead, late events won’t line up historically. - Backfills: just write the true historical timestamp into
SRC_INSERT_DATE
; DLT reflows SCD2 for you. - Column exclusions: use
except_column_list
to keep “operational/control” columns (likeSRC_ACTION
,SRC_INSERT_DATE
, file names, etc.) out of change detection. - Debugging: run in Development mode to iterate quickly; switch to Production to auto-terminate clusters after updates.
- Deletes in SCD2: prefer a soft-delete attribute that creates a final historical row rather than hard delete (history loss).
If you want, I can turn this into a “copy-paste” DLT tutorial notebook with small helper cells that seed demo data and show sample queries (SCD1 vs SCD2, deletes, truncates, and back-loading).