Pass parameters in a DLT pipeline | Generate tables dynamically
This hands-on guide shows how to:
- Ingest files with Auto Loader inside a Delta Live Tables (DLT) pipeline
- Union multiple streaming sources incrementally with Append Flow
- Parameterize your pipeline and generate tables dynamically
We’ll build on your earlier DLT pipeline (Orders + Customers → Silver → Gold). If you’re starting fresh, you can still follow along—each step is self-contained.
Prereqs (one-time)
- Unity Catalog enabled; a catalog
dev
and schemaetl
- Two existing Delta tables (from samples or your own):
dev.bronze.orders_raw
dev.bronze.customer_raw
- A working DLT pipeline in Development mode
- Cluster permissions to create Managed Volumes
Introduction (What we’ll build)
- Add an Auto Loader source that reads CSV files into a streaming table.
- Create a Union streaming table that incrementally combines:
- Streaming from the Delta table (orders_bronze)
- Streaming from Auto Loader (orders_autoloader_bronze)
- Join that union with customers, then recompute Gold.
- Pass a parameter into the pipeline (order status list) and generate multiple Gold tables dynamically.
Use Auto Loader inside DLT
1) Create a managed Volume and folders for Auto Loader
Run this once in a regular notebook (not the DLT notebook):
-- Managed volume to land files and store Auto Loader schemas
CREATE VOLUME IF NOT EXISTS dev.etl.landing;
-- Create subfolders (logical paths inside the volume)
-- You can create them from UI as well (Volumes browser)
In the Workspace sidebar, under Catalogs → dev → etl → Volumes → landing, create two folders:
/Volumes/dev/etl/landing/files
/Volumes/dev/etl/landing/autoloader_schemas
We’ll drop CSVs into
/files
, and let Auto Loader save schema info under/autoloader_schemas
.
DLT notebook — imports
At the top of your DLT notebook:
import dlt
from pyspark.sql.functions import col, current_timestamp, count as f_count, sum as f_sum
(We’ll import more only as needed.)
Existing sources (recap)
You likely already have:
@dlt.table(
name="orders_bronze",
comment="Orders from Delta table (streaming input)",
table_properties={"quality": "bronze"}
)
def orders_bronze():
# Delta table as a streaming source
return spark.readStream.table("dev.bronze.orders_raw")
@dlt.table(
name="customer_bronze",
comment="Customers from Delta table (batch input)",
table_properties={"quality": "bronze"}
)
def customer_bronze():
return spark.read.table("dev.bronze.customer_raw")
New: Auto Loader streaming table
We’ll read CSVs with cloudFiles and keep schema stable (no evolution), so we can union with the Delta-sourced stream.
@dlt.table(
name="orders_autoloader_bronze",
comment="Orders from files via Auto Loader (streaming)",
table_properties={"quality": "bronze"}
)
def orders_autoloader_bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
# Make sure columns match orders_bronze schema (adjust as needed!)
.option("cloudFiles.schemaHints",
"o_orderkey BIGINT, o_custkey BIGINT, o_orderstatus STRING, "
"o_totalprice DOUBLE, o_orderdate DATE, c_mktsegment STRING")
.option("cloudFiles.schemaLocation", "/Volumes/dev/etl/landing/autoloader_schemas/1")
.option("cloudFiles.schemaEvolutionMode", "none")
.option("pathGlobFilter", "*.csv")
.load("/Volumes/dev/etl/landing/files")
)
Why no checkpoint option? In DLT, streaming table checkpoints are managed automatically under the dataset’s storage path.
Append Flow (Union of streaming tables, incrementally)
Problem: If you simply union
two streaming DataFrames, each run may re-scan both inputs, defeating incremental behavior.
Solution: Use Append Flow to incrementally append from multiple streaming sources into a single streaming table.
1) Create the target union streaming table
dlt.create_streaming_table("orders_union_bronze")
2) Append source #1: Delta-sourced stream
@dlt.append_flow(target="orders_union_bronze")
def append_from_orders_bronze():
return dlt.read_stream("orders_bronze") # live table in same pipeline
3) Append source #2: Auto Loader stream
@dlt.append_flow(target="orders_union_bronze")
def append_from_orders_autoloader():
return dlt.read_stream("orders_autoloader_bronze")
The Append Flow ensures the union table only processes new increments from each source, not full re-reads.
4) Join union with customers (Silver)
Replace any previous “orders + customers” join to read from the union table:
@dlt.view(name="orders_customers_join_v")
def orders_customers_join_v():
o = dlt.read("orders_union_bronze")
c = dlt.read("customer_bronze")
return (o.join(c, o.o_custkey == c.c_custkey, "left")
.select(
o.o_orderkey, o.o_custkey, o.o_orderstatus, o.o_totalprice, o.o_orderdate,
c.c_mktsegment
))
@dlt.table(
name="orders_silver",
comment="Joined orders + customers",
table_properties={"quality": "silver"}
)
def orders_silver():
return dlt.read("orders_customers_join_v").withColumn("insert_ts", current_timestamp())
5) Gold (baseline)
@dlt.table(
name="orders_aggregated_gold",
comment="Aggregation by market segment",
table_properties={"quality": "gold"}
)
def orders_aggregated_gold():
df = dlt.read("orders_silver")
return (df.groupBy("c_mktsegment")
.agg(
f_count("o_orderkey").alias("count_orders"),
f_sum("o_totalprice").alias("sum_totalprice")
))
Test it
- Upload a CSV into
/Volumes/dev/etl/landing/files
(headers & columns must match schema hints). - Validate the pipeline, then Start.
- First run of
orders_union_bronze
will backfill both sources; subsequent runs will only process new files (Auto Loader) or new rows (Delta stream).
Pass Parameters into a DLT pipeline
We’ll parameterize order status and create dynamic Gold tables per status.
1) Add a pipeline configuration
In Pipelines → Settings → Configuration, add:
custom.order_status=o,f
(Comma-separated list; adjust values as needed.)
2) Read the config in your DLT notebook
At the top:
order_status_csv = spark.conf.get("custom.order_status", "na")
order_statuses = [s.strip() for s in order_status_csv.split(",") if s.strip() and s.strip().lower() != "na"]
3) Generate Materialized Views dynamically
Loop over the statuses and emit a gold table per status:
for status in order_statuses:
@dlt.table(
name=f"orders_aggregated_{status}_gold",
comment=f"Per-status aggregation for order_status={status}",
table_properties={"quality": "gold"}
)
def _make_status_gold(status=status):
src = dlt.read("orders_silver").where(col("o_orderstatus") == status)
return (src.groupBy("c_mktsegment")
.agg(
f_count("o_orderkey").alias("count_orders"),
f_sum("o_totalprice").alias("sum_totalprice")
)
)
- If
custom.order_status=o,f
, you’ll get:orders_aggregated_o_gold
orders_aggregated_f_gold
Change the config, re-run the pipeline, and DLT will create or remove datasets accordingly (declarative lifecycle).
What to Expect on Runs
- First run after adding union:
orders_union_bronze
will backfill from both sources (Delta stream & any existing files). - Subsequent runs:
- Only new files (Auto Loader) and new Delta rows are processed.
- The union table processes increments only, thanks to Append Flow.
- Dynamic golds:
- Created/updated per status in your pipeline config.
- No manual DDL required for schema or table management.
Troubleshooting Tips
- Schema mismatch on union: Align the Auto Loader
cloudFiles.schemaHints
to match the columns/types of the Delta-sourced stream. - Validation error (missing function imports): Import any aggregation functions you use (
count
,sum
, etc.). - No increments processed: Confirm new files landed in
/Volumes/dev/etl/landing/files
and that filenames match*.csv
.
Recap
- Auto Loader inside DLT:
.format("cloudFiles")
+cloudFiles.*
options. - Append Flow: Incrementally union multiple streaming sources into one streaming table.
- Parameters: Use pipeline Configuration +
spark.conf.get(...)
to generate tables dynamically. - Declarative lifecycle: DLT manages table creation, schema evolution (when allowed), and removal.