Databricks: DLT Introduction


Introduction

Goal: Build a Delta Live Tables (DLT) pipeline that:

  • Reads raw “orders” (as streaming) and “customer” (as batch).
  • Joins them via a view.
  • Writes a refined Silver table.
  • Aggregates into a Gold table by market segment.

What DLT gives you (why declarative matters):

  • You write transformations; DLT handles orchestration, dependency graph (DAG), cluster mgmt, retries, data quality (if enabled), and error handling.
  • Optionally runs continuously (streaming) or on a schedule (triggered).
  • Requires the Premium (or higher) tier.

What we’ll build:

  • Bronze: two inputs (orders, customers)
  • Silver: a joined/cleaned table + audit column
  • Gold: an aggregate by market segment

What is Delta Live Tables (DLT)?

  • DLT is a declarative way to define ETL/ELT steps as tables and views.
  • You declare datasets with Python decorators (@dlt.table, @dlt.view) or SQL (CREATE LIVE TABLE, CREATE LIVE VIEW).
  • DLT builds the DAG, provisions a job compute cluster, runs in Triggered or Continuous mode, and stores lineage/metrics.

How to create a DLT Pipeline (prereqs + setup)

1) Create a working schema (Unity Catalog)

We’ll use dev.etl as the target schema for DLT artifacts.

-- in a SQL cell
CREATE SCHEMA IF NOT EXISTS dev.etl;

2) Prepare source data (deep clone the samples)

The transcript uses TPCH samples. We’ll deep clone them into dev.bronze as raw inputs.

-- Orders source table
CREATE TABLE IF NOT EXISTS dev.bronze.orders_raw
DEEP CLONE samples.tpch.orders;

-- Customers source table
CREATE TABLE IF NOT EXISTS dev.bronze.customer_raw
DEEP CLONE samples.tpch.customer;

Sanity-check:

SELECT * FROM dev.bronze.orders_raw   LIMIT 5;
SELECT * FROM dev.bronze.customer_raw LIMIT 5;

Streaming Tables, Materialized Views, Views in DLT

DLT has two dataset types in code:

  • Table (@dlt.table / CREATE LIVE TABLE): persisted in target schema.
    • If its input is streaming (e.g., read_stream or dlt.read_stream), the resulting table behaves as a streaming table (it ingests incrementally).
    • If its input is batch, it behaves like a batch-built table (recomputes deterministically when refreshed).
  • View (@dlt.view / CREATE LIVE VIEW): ephemeral within the pipeline; not materialized into the target schema. Great for intermediate joins/cleanup.


Create a DLT Streaming Table (Bronze: orders)

Python (recommended)

Create a new DLT Notebook (Python). Put this at the top:

import dlt
from pyspark.sql import functions as F

Now declare the streaming bronze table for orders:

@dlt.table(
    name="orders_bronze",  # optional; defaults to function name
    table_properties={"quality": "bronze"},
    comment="Orders (raw) as a streaming table"
)
def orders_bronze():
    # Delta supports streaming reads; use read_stream.table for a streaming source
    return spark.readStream.table("dev.bronze.orders_raw")

SQL (alternative)

CREATE LIVE TABLE orders_bronze
TBLPROPERTIES (quality = 'bronze')
COMMENT 'Orders (raw) as a streaming table'
AS SELECT * FROM dev.bronze.orders_raw;

In SQL, DLT understands lineage; this is still treated as streaming because the engine recognizes an upstream streaming source (Delta supports incremental reads). In Python you’re explicit via readStream.


Create a DLT Batch Table (Bronze: customers)

Python

@dlt.table(
    name="customer_bronze",
    table_properties={"quality": "bronze"},
    comment="Customers (raw) as batch table"
)
def customer_bronze():
    return spark.read.table("dev.bronze.customer_raw")  # batch read

SQL

CREATE LIVE TABLE customer_bronze
TBLPROPERTIES (quality = 'bronze')
COMMENT 'Customers (raw) as batch table'
AS SELECT * FROM dev.bronze.customer_raw;

Create a DLT View (to join)

Python best practice: reference pipeline tables with dlt.read() (batch semantic) or dlt.read_stream() (stream semantic).

We’ll read customers as batch and orders as stream, then join.

@dlt.view(
    name="join_view",
    comment="Join customers to orders"
)
def join_view():
    df_c = dlt.read("customer_bronze")      # batch read from within pipeline
    df_o = dlt.read_stream("orders_bronze") # stream read from within pipeline
    
    # Join on customer key (c_custkey == o_custkey)
    return (df_o.join(
                df_c,
                on=[df_o["o_custkey"] == df_c["c_custkey"]],
                how="left"
            ))

SQL equivalent uses the LIVE. keyword:

CREATE LIVE VIEW join_view AS
SELECT *
FROM LIVE.orders_bronze o
LEFT JOIN LIVE.customer_bronze c
  ON o.o_custkey = c.c_custkey;

The LIVE keyword (SQL) vs dlt.read* (Python)

  • SQL: use LIVE.<table_or_view_name> to reference another pipeline dataset.
  • Python: use dlt.read("name") (batch) or dlt.read_stream("name") (stream).

Using LIVE in Python isn’t valid—use the dlt helpers instead.


(Silver) Build a refined table with audit column

Turn the view into a persisted Silver table and add an insert_ts audit column:

Python

@dlt.table(
    name="joined_silver",
    table_properties={"quality": "silver"},
    comment="Joined orders+customers with audit column"
)
def joined_silver():
    return (dlt.read("join_view")
            .withColumn("insert_ts", F.current_timestamp()))

SQL

CREATE LIVE TABLE joined_silver
TBLPROPERTIES (quality = 'silver')
COMMENT 'Joined orders+customers with audit column'
AS
SELECT
  j.*,
  current_timestamp() AS insert_ts
FROM LIVE.join_view j;

(Gold) Aggregate by market segment

Group by c_mktsegment and count orders.

In TPCH, the order key is o_orderkey.

Python

@dlt.table(
    name="orders_aggregated_gold",
    table_properties={"quality": "gold"},
    comment="Orders aggregated by market segment"
)
def orders_aggregated_gold():
    return (dlt.read("joined_silver")
            .groupBy("c_mktsegment")
            .agg(F.count("o_orderkey").alias("order_count"))
            .withColumn("insert_ts", F.current_timestamp()))

SQL

CREATE LIVE TABLE orders_aggregated_gold
TBLPROPERTIES (quality = 'gold')
COMMENT 'Orders aggregated by market segment'
AS
SELECT
  c_mktsegment,
  COUNT(o_orderkey) AS order_count,
  current_timestamp() AS insert_ts
FROM LIVE.joined_silver
GROUP BY c_mktsegment;

Orchestrate the DLT Pipeline

  1. Workspace > New > Delta Live Tables pipeline (sometimes shown as “ETL Pipelines”).
  2. Name: 00_dlt_introduction (anything you like).
  3. Product edition:
    • Core: basics (tables/views/lineage).
    • Pro: adds Change Data Capture (CDC).
    • Advanced: adds expectations/data quality and more controls.
      For this tutorial, Core is fine.
  4. Mode:
    • Triggered: runs when started or on schedule.
    • Continuous: runs like a streaming job, never-ending.
      Choose Triggered for now.
  5. Notebook: select the DLT notebook you just created.
  6. Target: set Catalog = dev, Schema = etl.
  7. Compute: Fixed size, 1–2 workers is fine for the demo. Driver same as worker.
  8. Channel: Current (Preview adds newer features; stick to Current unless you need preview features).
  9. Click Create, then Start.

You’ll see:

  • Waiting for resources → Initializing → Setting up tables → Rendering graph → Running
  • A DAG graph of your pipeline, with per-dataset metrics (rows read/written).
  • Event log at the bottom (great for debugging failed resolutions/imports).

Development vs Production mode

At the top of the pipeline page:

  • Development (default): keeps the job cluster running after success/failure, which is handy for quick re-runs and debugging (faster iteration).
  • Production: cluster is terminated when a run finishes (success or failure), reducing idle costs and behaving more like scheduled jobs.

Switch to Production when you automate/schedule.


Verifying results

In a notebook (attached to any cluster), query the outputs:

SELECT * FROM dev.etl.joined_silver       LIMIT 5;
SELECT * FROM dev.etl.orders_aggregated_gold ORDER BY order_count DESC;

You should see a handful of market segments with counts and insert_ts.




Full Python DLT notebook (copy/paste)

import dlt
from pyspark.sql import functions as F

# BRONZE: orders as streaming table
@dlt.table(
    name="orders_bronze",
    table_properties={"quality": "bronze"},
    comment="Orders (raw) as a streaming table"
)
def orders_bronze():
    return spark.readStream.table("dev.bronze.orders_raw")

# BRONZE: customers as batch table
@dlt.table(
    name="customer_bronze",
    table_properties={"quality": "bronze"},
    comment="Customers (raw) as batch table"
)
def customer_bronze():
    return spark.read.table("dev.bronze.customer_raw")

# VIEW: join within the pipeline (not materialized)
@dlt.view(
    name="join_view",
    comment="Join customers to orders"
)
def join_view():
    df_c = dlt.read("customer_bronze")        # batch semantics
    df_o = dlt.read_stream("orders_bronze")   # streaming semantics
    return (df_o.join(
                df_c,
                on=[df_o["o_custkey"] == df_c["c_custkey"]],
                how="left"
            ))

# SILVER: persisted joined table w/ audit column
@dlt.table(
    name="joined_silver",
    table_properties={"quality": "silver"},
    comment="Joined orders+customers with audit column"
)
def joined_silver():
    return dlt.read("join_view").withColumn("insert_ts", F.current_timestamp())

# GOLD: aggregated table by market segment
@dlt.table(
    name="orders_aggregated_gold",
    table_properties={"quality": "gold"},
    comment="Orders aggregated by market segment"
)
def orders_aggregated_gold():
    return (dlt.read("joined_silver")
            .groupBy("c_mktsegment")
            .agg(F.count("o_orderkey").alias("order_count"))
            .withColumn("insert_ts", F.current_timestamp()))

Example Github Repo

  • https://github.com/databricks/delta-live-tables-notebooks
  • https://github.com/a0x8o/delta-live-tables-hands-on-workshop