Pass parameters in a DLT pipeline | Generate tables dynamically
This hands-on guide shows how to:
- Ingest files with Auto Loader inside a Delta Live Tables (DLT) pipeline
 - Union multiple streaming sources incrementally with Append Flow
 - Parameterize your pipeline and generate tables dynamically
 
We’ll build on your earlier DLT pipeline (Orders + Customers → Silver → Gold). If you’re starting fresh, you can still follow along—each step is self-contained.
Prereqs (one-time)
- Unity Catalog enabled; a catalog 
devand schemaetl - Two existing Delta tables (from samples or your own):
dev.bronze.orders_rawdev.bronze.customer_raw
 - A working DLT pipeline in Development mode
 - Cluster permissions to create Managed Volumes
 
Introduction (What we’ll build)
- Add an Auto Loader source that reads CSV files into a streaming table.
 - Create a Union streaming table that incrementally combines:
- Streaming from the Delta table (orders_bronze)
 - Streaming from Auto Loader (orders_autoloader_bronze)
 
 - Join that union with customers, then recompute Gold.
 - Pass a parameter into the pipeline (order status list) and generate multiple Gold tables dynamically.
 
Use Auto Loader inside DLT
1) Create a managed Volume and folders for Auto Loader
Run this once in a regular notebook (not the DLT notebook):
-- Managed volume to land files and store Auto Loader schemas
CREATE VOLUME IF NOT EXISTS dev.etl.landing;
-- Create subfolders (logical paths inside the volume)
-- You can create them from UI as well (Volumes browser)
In the Workspace sidebar, under Catalogs → dev → etl → Volumes → landing, create two folders:
/Volumes/dev/etl/landing/files
/Volumes/dev/etl/landing/autoloader_schemas
We’ll drop CSVs into
/files, and let Auto Loader save schema info under/autoloader_schemas.
DLT notebook — imports
At the top of your DLT notebook:
import dlt
from pyspark.sql.functions import col, current_timestamp, count as f_count, sum as f_sum
(We’ll import more only as needed.)
Existing sources (recap)
You likely already have:
@dlt.table(
    name="orders_bronze",
    comment="Orders from Delta table (streaming input)",
    table_properties={"quality": "bronze"}
)
def orders_bronze():
    # Delta table as a streaming source
    return spark.readStream.table("dev.bronze.orders_raw")
@dlt.table(
    name="customer_bronze",
    comment="Customers from Delta table (batch input)",
    table_properties={"quality": "bronze"}
)
def customer_bronze():
    return spark.read.table("dev.bronze.customer_raw")
New: Auto Loader streaming table
We’ll read CSVs with cloudFiles and keep schema stable (no evolution), so we can union with the Delta-sourced stream.
@dlt.table(
    name="orders_autoloader_bronze",
    comment="Orders from files via Auto Loader (streaming)",
    table_properties={"quality": "bronze"}
)
def orders_autoloader_bronze():
    return (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            # Make sure columns match orders_bronze schema (adjust as needed!)
            .option("cloudFiles.schemaHints", 
                    "o_orderkey BIGINT, o_custkey BIGINT, o_orderstatus STRING, "
                    "o_totalprice DOUBLE, o_orderdate DATE, c_mktsegment STRING")
            .option("cloudFiles.schemaLocation", "/Volumes/dev/etl/landing/autoloader_schemas/1")
            .option("cloudFiles.schemaEvolutionMode", "none")
            .option("pathGlobFilter", "*.csv")
            .load("/Volumes/dev/etl/landing/files")
    )
Why no checkpoint option? In DLT, streaming table checkpoints are managed automatically under the dataset’s storage path.
Append Flow (Union of streaming tables, incrementally)
Problem: If you simply union two streaming DataFrames, each run may re-scan both inputs, defeating incremental behavior.
Solution: Use Append Flow to incrementally append from multiple streaming sources into a single streaming table.
1) Create the target union streaming table
dlt.create_streaming_table("orders_union_bronze")
2) Append source #1: Delta-sourced stream
@dlt.append_flow(target="orders_union_bronze")
def append_from_orders_bronze():
    return dlt.read_stream("orders_bronze")  # live table in same pipeline
3) Append source #2: Auto Loader stream
@dlt.append_flow(target="orders_union_bronze")
def append_from_orders_autoloader():
    return dlt.read_stream("orders_autoloader_bronze")
The Append Flow ensures the union table only processes new increments from each source, not full re-reads.
4) Join union with customers (Silver)
Replace any previous “orders + customers” join to read from the union table:
@dlt.view(name="orders_customers_join_v")
def orders_customers_join_v():
    o = dlt.read("orders_union_bronze")
    c = dlt.read("customer_bronze")
    return (o.join(c, o.o_custkey == c.c_custkey, "left")
             .select(
                 o.o_orderkey, o.o_custkey, o.o_orderstatus, o.o_totalprice, o.o_orderdate,
                 c.c_mktsegment
             ))
@dlt.table(
    name="orders_silver",
    comment="Joined orders + customers",
    table_properties={"quality": "silver"}
)
def orders_silver():
    return dlt.read("orders_customers_join_v").withColumn("insert_ts", current_timestamp())
5) Gold (baseline)
@dlt.table(
    name="orders_aggregated_gold",
    comment="Aggregation by market segment",
    table_properties={"quality": "gold"}
)
def orders_aggregated_gold():
    df = dlt.read("orders_silver")
    return (df.groupBy("c_mktsegment")
              .agg(
                  f_count("o_orderkey").alias("count_orders"),
                  f_sum("o_totalprice").alias("sum_totalprice")
              ))
Test it
- Upload a CSV into 
/Volumes/dev/etl/landing/files(headers & columns must match schema hints). - Validate the pipeline, then Start.
 - First run of 
orders_union_bronzewill backfill both sources; subsequent runs will only process new files (Auto Loader) or new rows (Delta stream). 
Pass Parameters into a DLT pipeline
We’ll parameterize order status and create dynamic Gold tables per status.
1) Add a pipeline configuration
In Pipelines → Settings → Configuration, add:
custom.order_status=o,f
(Comma-separated list; adjust values as needed.)
2) Read the config in your DLT notebook
At the top:
order_status_csv = spark.conf.get("custom.order_status", "na")
order_statuses = [s.strip() for s in order_status_csv.split(",") if s.strip() and s.strip().lower() != "na"]
3) Generate Materialized Views dynamically
Loop over the statuses and emit a gold table per status:
for status in order_statuses:
    @dlt.table(
        name=f"orders_aggregated_{status}_gold",
        comment=f"Per-status aggregation for order_status={status}",
        table_properties={"quality": "gold"}
    )
    def _make_status_gold(status=status):
        src = dlt.read("orders_silver").where(col("o_orderstatus") == status)
        return (src.groupBy("c_mktsegment")
                    .agg(
                        f_count("o_orderkey").alias("count_orders"),
                        f_sum("o_totalprice").alias("sum_totalprice")
                    )
                )
- If 
custom.order_status=o,f, you’ll get:orders_aggregated_o_goldorders_aggregated_f_gold
 
Change the config, re-run the pipeline, and DLT will create or remove datasets accordingly (declarative lifecycle).
What to Expect on Runs
- First run after adding union:
orders_union_bronzewill backfill from both sources (Delta stream & any existing files). - Subsequent runs:
- Only new files (Auto Loader) and new Delta rows are processed.
 - The union table processes increments only, thanks to Append Flow.
 
 - Dynamic golds:
- Created/updated per status in your pipeline config.
 - No manual DDL required for schema or table management.
 
 
Troubleshooting Tips
- Schema mismatch on union: Align the Auto Loader 
cloudFiles.schemaHintsto match the columns/types of the Delta-sourced stream. - Validation error (missing function imports): Import any aggregation functions you use (
count,sum, etc.). - No increments processed: Confirm new files landed in 
/Volumes/dev/etl/landing/filesand that filenames match*.csv. 
Recap
- Auto Loader inside DLT: 
.format("cloudFiles")+cloudFiles.*options. - Append Flow: Incrementally union multiple streaming sources into one streaming table.
 - Parameters: Use pipeline Configuration + 
spark.conf.get(...)to generate tables dynamically. - Declarative lifecycle: DLT manages table creation, schema evolution (when allowed), and removal.