{"id":829,"date":"2025-09-02T15:19:51","date_gmt":"2025-09-02T15:19:51","guid":{"rendered":"https:\/\/dataopsschool.com\/blog\/?p=829"},"modified":"2025-09-02T15:19:52","modified_gmt":"2025-09-02T15:19:52","slug":"databricks-dlt-append-flow-union-auto-loader","status":"publish","type":"post","link":"https:\/\/dataopsschool.com\/blog\/databricks-dlt-append-flow-union-auto-loader\/","title":{"rendered":"Databricks: DLT Append Flow (Union) &amp; Auto Loader"},"content":{"rendered":"\n<p><\/p>\n\n\n\n<p><strong>Pass parameters in a DLT pipeline | Generate tables dynamically<\/strong><\/p>\n\n\n\n<p>This hands-on guide shows how to:<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Ingest files with <strong>Auto Loader<\/strong> inside a <strong>Delta Live Tables (DLT)<\/strong> pipeline<\/li>\n\n\n\n<li><strong>Union multiple streaming sources incrementally<\/strong> with <strong>Append Flow<\/strong><\/li>\n\n\n\n<li><strong>Parameterize<\/strong> your pipeline and <strong>generate tables dynamically<\/strong><\/li>\n<\/ul>\n\n\n\n<p>We\u2019ll build on your earlier DLT pipeline (Orders + Customers \u2192 Silver \u2192 Gold). If you\u2019re starting fresh, you can still follow along\u2014each step is self-contained.<\/p>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h2 class=\"wp-block-heading\">Prereqs (one-time)<\/h2>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Unity Catalog enabled; a catalog <code>dev<\/code> and schema <code>etl<\/code><\/li>\n\n\n\n<li>Two existing Delta tables (from samples or your own):\n<ul class=\"wp-block-list\">\n<li><code>dev.bronze.orders_raw<\/code><\/li>\n\n\n\n<li><code>dev.bronze.customer_raw<\/code><\/li>\n<\/ul>\n<\/li>\n\n\n\n<li>A working DLT pipeline in <strong>Development<\/strong> mode<\/li>\n\n\n\n<li>Cluster permissions to create <strong>Managed Volumes<\/strong><\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h1 class=\"wp-block-heading\">Introduction (What we\u2019ll build)<\/h1>\n\n\n\n<ol class=\"wp-block-list\">\n<li>Add an <strong>Auto Loader<\/strong> source that reads CSV files into a <strong>streaming table<\/strong>.<\/li>\n\n\n\n<li>Create a <strong>Union streaming table<\/strong> that incrementally combines:\n<ul class=\"wp-block-list\">\n<li>Streaming from the Delta table (orders_bronze)<\/li>\n\n\n\n<li>Streaming from Auto Loader (orders_autoloader_bronze)<\/li>\n<\/ul>\n<\/li>\n\n\n\n<li><strong>Join<\/strong> that union with customers, then recompute <strong>Gold<\/strong>.<\/li>\n\n\n\n<li>Pass a <strong>parameter<\/strong> into the pipeline (order status list) and <strong>generate multiple Gold tables<\/strong> dynamically.<\/li>\n<\/ol>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h1 class=\"wp-block-heading\">Use Auto Loader inside DLT<\/h1>\n\n\n\n<h2 class=\"wp-block-heading\">1) Create a managed Volume and folders for Auto Loader<\/h2>\n\n\n\n<p>Run this once in a regular notebook (not the DLT notebook):<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>-- Managed volume to land files and store Auto Loader schemas\nCREATE VOLUME IF NOT EXISTS dev.etl.landing;\n\n-- Create subfolders (logical paths inside the volume)\n-- You can create them from UI as well (Volumes browser)\n<\/code><\/pre>\n\n\n\n<p>In the Workspace sidebar, under <strong>Catalogs \u2192 dev \u2192 etl \u2192 Volumes \u2192 landing<\/strong>, create two folders:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>\/Volumes\/dev\/etl\/landing\/files\n\/Volumes\/dev\/etl\/landing\/autoloader_schemas\n<\/code><\/pre>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\">\n<p>We\u2019ll drop CSVs into <code>\/files<\/code>, and let Auto Loader save schema info under <code>\/autoloader_schemas<\/code>.<\/p>\n<\/blockquote>\n\n\n\n<h2 class=\"wp-block-heading\">DLT notebook \u2014 imports<\/h2>\n\n\n\n<p>At the top of your <strong>DLT notebook<\/strong>:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>import dlt\nfrom pyspark.sql.functions import col, current_timestamp, count as f_count, sum as f_sum\n<\/code><\/pre>\n\n\n\n<p><em>(We\u2019ll import more only as needed.)<\/em><\/p>\n\n\n\n<h2 class=\"wp-block-heading\">Existing sources (recap)<\/h2>\n\n\n\n<p>You likely already have:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>@dlt.table(\n    name=\"orders_bronze\",\n    comment=\"Orders from Delta table (streaming input)\",\n    table_properties={\"quality\": \"bronze\"}\n)\ndef orders_bronze():\n    # Delta table as a streaming source\n    return spark.readStream.table(\"dev.bronze.orders_raw\")\n\n@dlt.table(\n    name=\"customer_bronze\",\n    comment=\"Customers from Delta table (batch input)\",\n    table_properties={\"quality\": \"bronze\"}\n)\ndef customer_bronze():\n    return spark.read.table(\"dev.bronze.customer_raw\")\n<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">New: Auto Loader streaming table<\/h2>\n\n\n\n<p>We\u2019ll read CSVs with <strong>cloudFiles<\/strong> and keep schema stable (no evolution), so we can union with the Delta-sourced stream.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>@dlt.table(\n    name=\"orders_autoloader_bronze\",\n    comment=\"Orders from files via Auto Loader (streaming)\",\n    table_properties={\"quality\": \"bronze\"}\n)\ndef orders_autoloader_bronze():\n    return (\n        spark.readStream\n            .format(\"cloudFiles\")\n            .option(\"cloudFiles.format\", \"csv\")\n            # Make sure columns match orders_bronze schema (adjust as needed!)\n            .option(\"cloudFiles.schemaHints\", \n                    \"o_orderkey BIGINT, o_custkey BIGINT, o_orderstatus STRING, \"\n                    \"o_totalprice DOUBLE, o_orderdate DATE, c_mktsegment STRING\")\n            .option(\"cloudFiles.schemaLocation\", \"\/Volumes\/dev\/etl\/landing\/autoloader_schemas\/1\")\n            .option(\"cloudFiles.schemaEvolutionMode\", \"none\")\n            .option(\"pathGlobFilter\", \"*.csv\")\n            .load(\"\/Volumes\/dev\/etl\/landing\/files\")\n    )\n<\/code><\/pre>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\">\n<p>Why no checkpoint option? In DLT, streaming table checkpoints are managed automatically under the dataset\u2019s storage path.<\/p>\n<\/blockquote>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h1 class=\"wp-block-heading\">Append Flow (Union of streaming tables, incrementally)<\/h1>\n\n\n\n<p><strong>Problem<\/strong>: If you simply <code>union<\/code> two streaming DataFrames, each run may re-scan both inputs, defeating incremental behavior.<\/p>\n\n\n\n<p><strong>Solution<\/strong>: Use <strong>Append Flow<\/strong> to incrementally <strong>append<\/strong> from multiple streaming sources into a <strong>single streaming table<\/strong>.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\">1) Create the target union streaming table<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>dlt.create_streaming_table(\"orders_union_bronze\")\n<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">2) Append source #1: Delta-sourced stream<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>@dlt.append_flow(target=\"orders_union_bronze\")\ndef append_from_orders_bronze():\n    return dlt.read_stream(\"orders_bronze\")  # live table in same pipeline\n<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">3) Append source #2: Auto Loader stream<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>@dlt.append_flow(target=\"orders_union_bronze\")\ndef append_from_orders_autoloader():\n    return dlt.read_stream(\"orders_autoloader_bronze\")\n<\/code><\/pre>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\">\n<p>The <strong>Append Flow<\/strong> ensures the union table only processes <strong>new<\/strong> increments from each source, not full re-reads.<\/p>\n<\/blockquote>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h2 class=\"wp-block-heading\">4) Join union with customers (Silver)<\/h2>\n\n\n\n<p>Replace any previous \u201corders + customers\u201d join to read from the <strong>union<\/strong> table:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>@dlt.view(name=\"orders_customers_join_v\")\ndef orders_customers_join_v():\n    o = dlt.read(\"orders_union_bronze\")\n    c = dlt.read(\"customer_bronze\")\n    return (o.join(c, o.o_custkey == c.c_custkey, \"left\")\n             .select(\n                 o.o_orderkey, o.o_custkey, o.o_orderstatus, o.o_totalprice, o.o_orderdate,\n                 c.c_mktsegment\n             ))\n\n@dlt.table(\n    name=\"orders_silver\",\n    comment=\"Joined orders + customers\",\n    table_properties={\"quality\": \"silver\"}\n)\ndef orders_silver():\n    return dlt.read(\"orders_customers_join_v\").withColumn(\"insert_ts\", current_timestamp())\n<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">5) Gold (baseline)<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>@dlt.table(\n    name=\"orders_aggregated_gold\",\n    comment=\"Aggregation by market segment\",\n    table_properties={\"quality\": \"gold\"}\n)\ndef orders_aggregated_gold():\n    df = dlt.read(\"orders_silver\")\n    return (df.groupBy(\"c_mktsegment\")\n              .agg(\n                  f_count(\"o_orderkey\").alias(\"count_orders\"),\n                  f_sum(\"o_totalprice\").alias(\"sum_totalprice\")\n              ))\n<\/code><\/pre>\n\n\n\n<h3 class=\"wp-block-heading\">Test it<\/h3>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Upload a CSV<\/strong> into <code>\/Volumes\/dev\/etl\/landing\/files<\/code> (headers &amp; columns must match schema hints).<\/li>\n\n\n\n<li><strong>Validate<\/strong> the pipeline, then <strong>Start<\/strong>.<\/li>\n\n\n\n<li>First run of <code>orders_union_bronze<\/code> will backfill both sources; subsequent runs will only process new files (Auto Loader) or new rows (Delta stream).<\/li>\n<\/ol>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h1 class=\"wp-block-heading\">Pass Parameters into a DLT pipeline<\/h1>\n\n\n\n<p>We\u2019ll parameterize <strong>order status<\/strong> and create <strong>dynamic Gold<\/strong> tables per status.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\">1) Add a pipeline configuration<\/h2>\n\n\n\n<p>In <strong>Pipelines \u2192 Settings \u2192 Configuration<\/strong>, add:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>custom.order_status=o,f\n<\/code><\/pre>\n\n\n\n<p><em>(Comma-separated list; adjust values as needed.)<\/em><\/p>\n\n\n\n<h2 class=\"wp-block-heading\">2) Read the config in your DLT notebook<\/h2>\n\n\n\n<p>At the top:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>order_status_csv = spark.conf.get(\"custom.order_status\", \"na\")\norder_statuses = &#91;s.strip() for s in order_status_csv.split(\",\") if s.strip() and s.strip().lower() != \"na\"]\n<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">3) Generate Materialized Views dynamically<\/h2>\n\n\n\n<p>Loop over the statuses and emit a <strong>gold<\/strong> table per status:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>for status in order_statuses:\n    @dlt.table(\n        name=f\"orders_aggregated_{status}_gold\",\n        comment=f\"Per-status aggregation for order_status={status}\",\n        table_properties={\"quality\": \"gold\"}\n    )\n    def _make_status_gold(status=status):\n        src = dlt.read(\"orders_silver\").where(col(\"o_orderstatus\") == status)\n        return (src.groupBy(\"c_mktsegment\")\n                    .agg(\n                        f_count(\"o_orderkey\").alias(\"count_orders\"),\n                        f_sum(\"o_totalprice\").alias(\"sum_totalprice\")\n                    )\n                )\n<\/code><\/pre>\n\n\n\n<ul class=\"wp-block-list\">\n<li>If <code>custom.order_status=o,f<\/code>, you\u2019ll get:\n<ul class=\"wp-block-list\">\n<li><code>orders_aggregated_o_gold<\/code><\/li>\n\n\n\n<li><code>orders_aggregated_f_gold<\/code><\/li>\n<\/ul>\n<\/li>\n<\/ul>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\">\n<p>Change the config, re-run the pipeline, and DLT will <strong>create or remove<\/strong> datasets accordingly (declarative lifecycle).<\/p>\n<\/blockquote>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h2 class=\"wp-block-heading\">What to Expect on Runs<\/h2>\n\n\n\n<ul class=\"wp-block-list\">\n<li><strong>First run after adding union<\/strong>:<br><code>orders_union_bronze<\/code> will backfill from both sources (Delta stream &amp; any existing files).<\/li>\n\n\n\n<li><strong>Subsequent runs<\/strong>:\n<ul class=\"wp-block-list\">\n<li>Only <strong>new files<\/strong> (Auto Loader) and <strong>new Delta rows<\/strong> are processed.<\/li>\n\n\n\n<li>The union table processes <strong>increments only<\/strong>, thanks to Append Flow.<\/li>\n<\/ul>\n<\/li>\n\n\n\n<li><strong>Dynamic golds<\/strong>:\n<ul class=\"wp-block-list\">\n<li>Created\/updated per status in your pipeline config.<\/li>\n\n\n\n<li>No manual DDL required for schema or table management.<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h2 class=\"wp-block-heading\">Troubleshooting Tips<\/h2>\n\n\n\n<ul class=\"wp-block-list\">\n<li><strong>Schema mismatch on union<\/strong>: Align the Auto Loader <code>cloudFiles.schemaHints<\/code> to match the columns\/types of the Delta-sourced stream.<\/li>\n\n\n\n<li><strong>Validation error (missing function imports)<\/strong>: Import any aggregation functions you use (<code>count<\/code>, <code>sum<\/code>, etc.).<\/li>\n\n\n\n<li><strong>No increments processed<\/strong>: Confirm new files landed in <code>\/Volumes\/dev\/etl\/landing\/files<\/code> and that filenames match <code>*.csv<\/code>.<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h2 class=\"wp-block-heading\">Recap<\/h2>\n\n\n\n<ul class=\"wp-block-list\">\n<li><strong>Auto Loader<\/strong> inside DLT: <code>.format(\"cloudFiles\")<\/code> + <code>cloudFiles.*<\/code> options.<\/li>\n\n\n\n<li><strong>Append Flow<\/strong>: Incrementally union multiple streaming sources into one <strong>streaming table<\/strong>.<\/li>\n\n\n\n<li><strong>Parameters<\/strong>: Use pipeline <strong>Configuration<\/strong> + <code>spark.conf.get(...)<\/code> to generate <strong>tables dynamically<\/strong>.<\/li>\n\n\n\n<li><strong>Declarative lifecycle<\/strong>: DLT manages table creation, schema evolution (when allowed), and removal.<\/li>\n<\/ul>\n\n\n\n<p><\/p>\n\n\n\n<p><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Pass parameters in a DLT pipeline | Generate tables dynamically This hands-on guide shows how to: We\u2019ll build on your earlier DLT pipeline (Orders + Customers \u2192&#8230; <\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-829","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/posts\/829","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/comments?post=829"}],"version-history":[{"count":1,"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/posts\/829\/revisions"}],"predecessor-version":[{"id":830,"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/posts\/829\/revisions\/830"}],"wp:attachment":[{"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/media?parent=829"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/categories?post=829"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/tags?post=829"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}