Databricks: DLT Append Flow (Union) & Auto Loader

Pass parameters in a DLT pipeline | Generate tables dynamically

This hands-on guide shows how to:

  • Ingest files with Auto Loader inside a Delta Live Tables (DLT) pipeline
  • Union multiple streaming sources incrementally with Append Flow
  • Parameterize your pipeline and generate tables dynamically

We’ll build on your earlier DLT pipeline (Orders + Customers → Silver → Gold). If you’re starting fresh, you can still follow along—each step is self-contained.


Prereqs (one-time)

  • Unity Catalog enabled; a catalog dev and schema etl
  • Two existing Delta tables (from samples or your own):
    • dev.bronze.orders_raw
    • dev.bronze.customer_raw
  • A working DLT pipeline in Development mode
  • Cluster permissions to create Managed Volumes

Introduction (What we’ll build)

  1. Add an Auto Loader source that reads CSV files into a streaming table.
  2. Create a Union streaming table that incrementally combines:
    • Streaming from the Delta table (orders_bronze)
    • Streaming from Auto Loader (orders_autoloader_bronze)
  3. Join that union with customers, then recompute Gold.
  4. Pass a parameter into the pipeline (order status list) and generate multiple Gold tables dynamically.

Use Auto Loader inside DLT

1) Create a managed Volume and folders for Auto Loader

Run this once in a regular notebook (not the DLT notebook):

-- Managed volume to land files and store Auto Loader schemas
CREATE VOLUME IF NOT EXISTS dev.etl.landing;

-- Create subfolders (logical paths inside the volume)
-- You can create them from UI as well (Volumes browser)

In the Workspace sidebar, under Catalogs → dev → etl → Volumes → landing, create two folders:

/Volumes/dev/etl/landing/files
/Volumes/dev/etl/landing/autoloader_schemas

We’ll drop CSVs into /files, and let Auto Loader save schema info under /autoloader_schemas.

DLT notebook — imports

At the top of your DLT notebook:

import dlt
from pyspark.sql.functions import col, current_timestamp, count as f_count, sum as f_sum

(We’ll import more only as needed.)

Existing sources (recap)

You likely already have:

@dlt.table(
    name="orders_bronze",
    comment="Orders from Delta table (streaming input)",
    table_properties={"quality": "bronze"}
)
def orders_bronze():
    # Delta table as a streaming source
    return spark.readStream.table("dev.bronze.orders_raw")

@dlt.table(
    name="customer_bronze",
    comment="Customers from Delta table (batch input)",
    table_properties={"quality": "bronze"}
)
def customer_bronze():
    return spark.read.table("dev.bronze.customer_raw")

New: Auto Loader streaming table

We’ll read CSVs with cloudFiles and keep schema stable (no evolution), so we can union with the Delta-sourced stream.

@dlt.table(
    name="orders_autoloader_bronze",
    comment="Orders from files via Auto Loader (streaming)",
    table_properties={"quality": "bronze"}
)
def orders_autoloader_bronze():
    return (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            # Make sure columns match orders_bronze schema (adjust as needed!)
            .option("cloudFiles.schemaHints", 
                    "o_orderkey BIGINT, o_custkey BIGINT, o_orderstatus STRING, "
                    "o_totalprice DOUBLE, o_orderdate DATE, c_mktsegment STRING")
            .option("cloudFiles.schemaLocation", "/Volumes/dev/etl/landing/autoloader_schemas/1")
            .option("cloudFiles.schemaEvolutionMode", "none")
            .option("pathGlobFilter", "*.csv")
            .load("/Volumes/dev/etl/landing/files")
    )

Why no checkpoint option? In DLT, streaming table checkpoints are managed automatically under the dataset’s storage path.


Append Flow (Union of streaming tables, incrementally)

Problem: If you simply union two streaming DataFrames, each run may re-scan both inputs, defeating incremental behavior.

Solution: Use Append Flow to incrementally append from multiple streaming sources into a single streaming table.

1) Create the target union streaming table

dlt.create_streaming_table("orders_union_bronze")

2) Append source #1: Delta-sourced stream

@dlt.append_flow(target="orders_union_bronze")
def append_from_orders_bronze():
    return dlt.read_stream("orders_bronze")  # live table in same pipeline

3) Append source #2: Auto Loader stream

@dlt.append_flow(target="orders_union_bronze")
def append_from_orders_autoloader():
    return dlt.read_stream("orders_autoloader_bronze")

The Append Flow ensures the union table only processes new increments from each source, not full re-reads.


4) Join union with customers (Silver)

Replace any previous “orders + customers” join to read from the union table:

@dlt.view(name="orders_customers_join_v")
def orders_customers_join_v():
    o = dlt.read("orders_union_bronze")
    c = dlt.read("customer_bronze")
    return (o.join(c, o.o_custkey == c.c_custkey, "left")
             .select(
                 o.o_orderkey, o.o_custkey, o.o_orderstatus, o.o_totalprice, o.o_orderdate,
                 c.c_mktsegment
             ))

@dlt.table(
    name="orders_silver",
    comment="Joined orders + customers",
    table_properties={"quality": "silver"}
)
def orders_silver():
    return dlt.read("orders_customers_join_v").withColumn("insert_ts", current_timestamp())

5) Gold (baseline)

@dlt.table(
    name="orders_aggregated_gold",
    comment="Aggregation by market segment",
    table_properties={"quality": "gold"}
)
def orders_aggregated_gold():
    df = dlt.read("orders_silver")
    return (df.groupBy("c_mktsegment")
              .agg(
                  f_count("o_orderkey").alias("count_orders"),
                  f_sum("o_totalprice").alias("sum_totalprice")
              ))

Test it

  1. Upload a CSV into /Volumes/dev/etl/landing/files (headers & columns must match schema hints).
  2. Validate the pipeline, then Start.
  3. First run of orders_union_bronze will backfill both sources; subsequent runs will only process new files (Auto Loader) or new rows (Delta stream).

Pass Parameters into a DLT pipeline

We’ll parameterize order status and create dynamic Gold tables per status.

1) Add a pipeline configuration

In Pipelines → Settings → Configuration, add:

custom.order_status=o,f

(Comma-separated list; adjust values as needed.)

2) Read the config in your DLT notebook

At the top:

order_status_csv = spark.conf.get("custom.order_status", "na")
order_statuses = [s.strip() for s in order_status_csv.split(",") if s.strip() and s.strip().lower() != "na"]

3) Generate Materialized Views dynamically

Loop over the statuses and emit a gold table per status:

for status in order_statuses:
    @dlt.table(
        name=f"orders_aggregated_{status}_gold",
        comment=f"Per-status aggregation for order_status={status}",
        table_properties={"quality": "gold"}
    )
    def _make_status_gold(status=status):
        src = dlt.read("orders_silver").where(col("o_orderstatus") == status)
        return (src.groupBy("c_mktsegment")
                    .agg(
                        f_count("o_orderkey").alias("count_orders"),
                        f_sum("o_totalprice").alias("sum_totalprice")
                    )
                )
  • If custom.order_status=o,f, you’ll get:
    • orders_aggregated_o_gold
    • orders_aggregated_f_gold

Change the config, re-run the pipeline, and DLT will create or remove datasets accordingly (declarative lifecycle).


What to Expect on Runs

  • First run after adding union:
    orders_union_bronze will backfill from both sources (Delta stream & any existing files).
  • Subsequent runs:
    • Only new files (Auto Loader) and new Delta rows are processed.
    • The union table processes increments only, thanks to Append Flow.
  • Dynamic golds:
    • Created/updated per status in your pipeline config.
    • No manual DDL required for schema or table management.

Troubleshooting Tips

  • Schema mismatch on union: Align the Auto Loader cloudFiles.schemaHints to match the columns/types of the Delta-sourced stream.
  • Validation error (missing function imports): Import any aggregation functions you use (count, sum, etc.).
  • No increments processed: Confirm new files landed in /Volumes/dev/etl/landing/files and that filenames match *.csv.

Recap

  • Auto Loader inside DLT: .format("cloudFiles") + cloudFiles.* options.
  • Append Flow: Incrementally union multiple streaming sources into one streaming table.
  • Parameters: Use pipeline Configuration + spark.conf.get(...) to generate tables dynamically.
  • Declarative lifecycle: DLT manages table creation, schema evolution (when allowed), and removal.

Related Posts

Strategic Cloud Financial Management With Certified FinOps Professional Training

Introduction The Certified FinOps Professional program is a transformative milestone for any engineer or manager looking to master the intersection of finance, technology, and business operations. This…

Read More

Professional Certified FinOps Engineer improves financial performance visibility systems

Introduction In the modern landscape of cloud infrastructure, technical expertise alone is no longer sufficient to drive enterprise success. The Certified FinOps Engineer program has emerged as…

Read More

Complete Cloud Financial Management Guide for Certified FinOps Manager

Introduction The Certified FinOps Manager program is designed to bridge the widening gap between cloud engineering and financial accountability. As cloud environments become more complex, organizations require…

Read More

Industry Ready FinOps Knowledge Through Certified FinOps Architect Program

Introduction The Certified FinOps Architect certification is designed to help professionals bridge the gap between cloud financial management and operational efficiency. This guide is tailored for working…

Read More

Advance Your Data Management Career with CDOM – Certified DataOps Manager

The CDOM – Certified DataOps Manager is a breakthrough certification designed for professionals who want to master the intersection of data engineering and operational agility. This guide…

Read More

Future focused learning with CDOA – Certified DataOps Architect certification

Introduction The CDOA – Certified DataOps Architect is a professional designed to bridge the gap between data engineering and operational excellence. This guide is written for engineers…

Read More