Databricks: DLT Append Flow (Union) & Auto Loader

Pass parameters in a DLT pipeline | Generate tables dynamically

This hands-on guide shows how to:

  • Ingest files with Auto Loader inside a Delta Live Tables (DLT) pipeline
  • Union multiple streaming sources incrementally with Append Flow
  • Parameterize your pipeline and generate tables dynamically

We’ll build on your earlier DLT pipeline (Orders + Customers → Silver → Gold). If you’re starting fresh, you can still follow along—each step is self-contained.


Prereqs (one-time)

  • Unity Catalog enabled; a catalog dev and schema etl
  • Two existing Delta tables (from samples or your own):
    • dev.bronze.orders_raw
    • dev.bronze.customer_raw
  • A working DLT pipeline in Development mode
  • Cluster permissions to create Managed Volumes

Introduction (What we’ll build)

  1. Add an Auto Loader source that reads CSV files into a streaming table.
  2. Create a Union streaming table that incrementally combines:
    • Streaming from the Delta table (orders_bronze)
    • Streaming from Auto Loader (orders_autoloader_bronze)
  3. Join that union with customers, then recompute Gold.
  4. Pass a parameter into the pipeline (order status list) and generate multiple Gold tables dynamically.

Use Auto Loader inside DLT

1) Create a managed Volume and folders for Auto Loader

Run this once in a regular notebook (not the DLT notebook):

-- Managed volume to land files and store Auto Loader schemas
CREATE VOLUME IF NOT EXISTS dev.etl.landing;

-- Create subfolders (logical paths inside the volume)
-- You can create them from UI as well (Volumes browser)

In the Workspace sidebar, under Catalogs → dev → etl → Volumes → landing, create two folders:

/Volumes/dev/etl/landing/files
/Volumes/dev/etl/landing/autoloader_schemas

We’ll drop CSVs into /files, and let Auto Loader save schema info under /autoloader_schemas.

DLT notebook — imports

At the top of your DLT notebook:

import dlt
from pyspark.sql.functions import col, current_timestamp, count as f_count, sum as f_sum

(We’ll import more only as needed.)

Existing sources (recap)

You likely already have:

@dlt.table(
    name="orders_bronze",
    comment="Orders from Delta table (streaming input)",
    table_properties={"quality": "bronze"}
)
def orders_bronze():
    # Delta table as a streaming source
    return spark.readStream.table("dev.bronze.orders_raw")

@dlt.table(
    name="customer_bronze",
    comment="Customers from Delta table (batch input)",
    table_properties={"quality": "bronze"}
)
def customer_bronze():
    return spark.read.table("dev.bronze.customer_raw")

New: Auto Loader streaming table

We’ll read CSVs with cloudFiles and keep schema stable (no evolution), so we can union with the Delta-sourced stream.

@dlt.table(
    name="orders_autoloader_bronze",
    comment="Orders from files via Auto Loader (streaming)",
    table_properties={"quality": "bronze"}
)
def orders_autoloader_bronze():
    return (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            # Make sure columns match orders_bronze schema (adjust as needed!)
            .option("cloudFiles.schemaHints", 
                    "o_orderkey BIGINT, o_custkey BIGINT, o_orderstatus STRING, "
                    "o_totalprice DOUBLE, o_orderdate DATE, c_mktsegment STRING")
            .option("cloudFiles.schemaLocation", "/Volumes/dev/etl/landing/autoloader_schemas/1")
            .option("cloudFiles.schemaEvolutionMode", "none")
            .option("pathGlobFilter", "*.csv")
            .load("/Volumes/dev/etl/landing/files")
    )

Why no checkpoint option? In DLT, streaming table checkpoints are managed automatically under the dataset’s storage path.


Append Flow (Union of streaming tables, incrementally)

Problem: If you simply union two streaming DataFrames, each run may re-scan both inputs, defeating incremental behavior.

Solution: Use Append Flow to incrementally append from multiple streaming sources into a single streaming table.

1) Create the target union streaming table

dlt.create_streaming_table("orders_union_bronze")

2) Append source #1: Delta-sourced stream

@dlt.append_flow(target="orders_union_bronze")
def append_from_orders_bronze():
    return dlt.read_stream("orders_bronze")  # live table in same pipeline

3) Append source #2: Auto Loader stream

@dlt.append_flow(target="orders_union_bronze")
def append_from_orders_autoloader():
    return dlt.read_stream("orders_autoloader_bronze")

The Append Flow ensures the union table only processes new increments from each source, not full re-reads.


4) Join union with customers (Silver)

Replace any previous “orders + customers” join to read from the union table:

@dlt.view(name="orders_customers_join_v")
def orders_customers_join_v():
    o = dlt.read("orders_union_bronze")
    c = dlt.read("customer_bronze")
    return (o.join(c, o.o_custkey == c.c_custkey, "left")
             .select(
                 o.o_orderkey, o.o_custkey, o.o_orderstatus, o.o_totalprice, o.o_orderdate,
                 c.c_mktsegment
             ))

@dlt.table(
    name="orders_silver",
    comment="Joined orders + customers",
    table_properties={"quality": "silver"}
)
def orders_silver():
    return dlt.read("orders_customers_join_v").withColumn("insert_ts", current_timestamp())

5) Gold (baseline)

@dlt.table(
    name="orders_aggregated_gold",
    comment="Aggregation by market segment",
    table_properties={"quality": "gold"}
)
def orders_aggregated_gold():
    df = dlt.read("orders_silver")
    return (df.groupBy("c_mktsegment")
              .agg(
                  f_count("o_orderkey").alias("count_orders"),
                  f_sum("o_totalprice").alias("sum_totalprice")
              ))

Test it

  1. Upload a CSV into /Volumes/dev/etl/landing/files (headers & columns must match schema hints).
  2. Validate the pipeline, then Start.
  3. First run of orders_union_bronze will backfill both sources; subsequent runs will only process new files (Auto Loader) or new rows (Delta stream).

Pass Parameters into a DLT pipeline

We’ll parameterize order status and create dynamic Gold tables per status.

1) Add a pipeline configuration

In Pipelines → Settings → Configuration, add:

custom.order_status=o,f

(Comma-separated list; adjust values as needed.)

2) Read the config in your DLT notebook

At the top:

order_status_csv = spark.conf.get("custom.order_status", "na")
order_statuses = [s.strip() for s in order_status_csv.split(",") if s.strip() and s.strip().lower() != "na"]

3) Generate Materialized Views dynamically

Loop over the statuses and emit a gold table per status:

for status in order_statuses:
    @dlt.table(
        name=f"orders_aggregated_{status}_gold",
        comment=f"Per-status aggregation for order_status={status}",
        table_properties={"quality": "gold"}
    )
    def _make_status_gold(status=status):
        src = dlt.read("orders_silver").where(col("o_orderstatus") == status)
        return (src.groupBy("c_mktsegment")
                    .agg(
                        f_count("o_orderkey").alias("count_orders"),
                        f_sum("o_totalprice").alias("sum_totalprice")
                    )
                )
  • If custom.order_status=o,f, you’ll get:
    • orders_aggregated_o_gold
    • orders_aggregated_f_gold

Change the config, re-run the pipeline, and DLT will create or remove datasets accordingly (declarative lifecycle).


What to Expect on Runs

  • First run after adding union:
    orders_union_bronze will backfill from both sources (Delta stream & any existing files).
  • Subsequent runs:
    • Only new files (Auto Loader) and new Delta rows are processed.
    • The union table processes increments only, thanks to Append Flow.
  • Dynamic golds:
    • Created/updated per status in your pipeline config.
    • No manual DDL required for schema or table management.

Troubleshooting Tips

  • Schema mismatch on union: Align the Auto Loader cloudFiles.schemaHints to match the columns/types of the Delta-sourced stream.
  • Validation error (missing function imports): Import any aggregation functions you use (count, sum, etc.).
  • No increments processed: Confirm new files landed in /Volumes/dev/etl/landing/files and that filenames match *.csv.

Recap

  • Auto Loader inside DLT: .format("cloudFiles") + cloudFiles.* options.
  • Append Flow: Incrementally union multiple streaming sources into one streaming table.
  • Parameters: Use pipeline Configuration + spark.conf.get(...) to generate tables dynamically.
  • Declarative lifecycle: DLT manages table creation, schema evolution (when allowed), and removal.