{"id":836,"date":"2025-09-07T14:11:26","date_gmt":"2025-09-07T14:11:26","guid":{"rendered":"https:\/\/dataopsschool.com\/blog\/?p=836"},"modified":"2025-09-07T14:11:27","modified_gmt":"2025-09-07T14:11:27","slug":"databricks-truncate-and-load-as-a-streaming-source-full-refresh-of-a-dlt-pipeline-workflow-file-arrival-triggers","status":"publish","type":"post","link":"https:\/\/dataopsschool.com\/blog\/databricks-truncate-and-load-as-a-streaming-source-full-refresh-of-a-dlt-pipeline-workflow-file-arrival-triggers\/","title":{"rendered":"Databricks: Truncate-and-Load as a streaming source, Full Refresh of a DLT pipeline, Workflow file-arrival triggers"},"content":{"rendered":"\n<p><\/p>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h1 class=\"wp-block-heading\">Introduction<\/h1>\n\n\n\n<p>Today we\u2019ll cover four production patterns for <strong>Delta Live Tables (DLT)<\/strong>:<\/p>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Truncate-and-Load as a streaming source<\/strong> (when the upstream source truncates then reloads each batch).<\/li>\n\n\n\n<li><strong>Full Refresh<\/strong> of a DLT pipeline\u2014and how to <strong>protect<\/strong> key tables from being reset.<\/li>\n\n\n\n<li><strong>Workflow file-arrival triggers<\/strong> to run DLT as soon as new files land.<\/li>\n\n\n\n<li><strong>Scheduling<\/strong> DLT (cron-style) and <strong>production mode<\/strong> behavior.<\/li>\n<\/ol>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h1 class=\"wp-block-heading\">Truncate-Load table as Source for Streaming Tables (with <code>skipChangeCommits<\/code>)<\/h1>\n\n\n\n<p><strong>Problem:<\/strong> Your upstream system truncates a Delta table and then inserts new data. A \u201cpure\u201d streaming read sees this as non-append changes and will fail.<\/p>\n\n\n\n<p><strong>Fix:<\/strong> When reading a Delta table as a <strong>streaming source<\/strong>, set:<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li><code>skipChangeCommits = true<\/code> \u2192 ignore change commits (updates\/deletes\/truncates) and continue from the latest snapshot. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/structured-streaming\/delta-lake?utm_source=chatgpt.com\">Databricks Documentation<\/a>, <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/databricks\/structured-streaming\/delta-lake?utm_source=chatgpt.com\">Microsoft Learn<\/a>)<\/li>\n<\/ul>\n\n\n\n<h3 class=\"wp-block-heading\">DLT (Python) example<\/h3>\n\n\n\n<pre class=\"wp-block-code\"><code>import dlt\nfrom pyspark.sql import functions as F\n\n# Truncate-load source path or table\nSOURCE_TABLE = \"source.orders_raw\"   # could also be a UC path like catalog.schema.table\n\n@dlt.table(\n    name=\"orders_bronze\",\n    comment=\"Bronze from truncate-load source using skipChangeCommits.\"\n)\ndef orders_bronze():\n    # If you read by table name:\n    return (spark.readStream\n                 .format(\"delta\")\n                 .option(\"skipChangeCommits\", \"true\")  # KEY!\n                 .table(SOURCE_TABLE))\n    # Or if you read by path:\n    # return (spark.readStream.format(\"delta\")\n    #         .option(\"skipChangeCommits\",\"true\")\n    #         .load(\"dbfs:\/Volumes\/dev\/etl\/landing\/files\/orders_raw\"))\n<\/code><\/pre>\n\n\n\n<p><strong>Notes<\/strong><\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Works for <strong>Delta<\/strong> streaming sources that occasionally do non-append changes (like truncation).<\/li>\n\n\n\n<li>Use with care: you\u2019re <strong>choosing<\/strong> to ignore \u201cchange\u201d history and always start from the <strong>latest snapshot<\/strong>. If you need change history, use CDC (<code>readChangeFeed<\/code>) instead. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/structured-streaming\/delta-lake?utm_source=chatgpt.com\">Databricks Documentation<\/a>)<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h1 class=\"wp-block-heading\">Full Refresh (what it does) &amp; how to <strong>avoid<\/strong> refreshing specific tables<\/h1>\n\n\n\n<p><strong>Full refresh<\/strong> re-computes &amp; re-loads <strong>all<\/strong> pipeline datasets from scratch; streaming tables may be <strong>truncated<\/strong> (history lost) before re-build. You can <strong>prevent<\/strong> full refresh on selected tables with a table property:<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li><code>pipelines.reset.allowed = false<\/code> (default is <code>true<\/code>). (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/dlt\/properties?utm_source=chatgpt.com\">Databricks Documentation<\/a>, <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/databricks\/dlt\/properties?utm_source=chatgpt.com\">Microsoft Learn<\/a>)<\/li>\n<\/ul>\n\n\n\n<h3 class=\"wp-block-heading\">DLT (Python) decorator with table properties<\/h3>\n\n\n\n<pre class=\"wp-block-code\"><code>@dlt.table(\n    name=\"orders_silver\",\n    table_properties={ \"pipelines.reset.allowed\": \"false\" },  # protect from full refresh\n    comment=\"Silver table protected from full refresh.\"\n)\ndef orders_silver():\n    df = dlt.read(\"orders_bronze\")\n    return df.select(\"order_key\",\"order_status\",\"order_price\",\"cust_id\")\n<\/code><\/pre>\n\n\n\n<h3 class=\"wp-block-heading\">DLT (SQL) equivalent<\/h3>\n\n\n\n<pre class=\"wp-block-code\"><code>CREATE OR REFRESH STREAMING LIVE TABLE orders_silver_sql\nTBLPROPERTIES (pipelines.reset.allowed = false)\nAS\nSELECT order_key, order_status, order_price, cust_id\nFROM LIVE.orders_bronze;\n<\/code><\/pre>\n\n\n\n<p><strong>When to use:<\/strong> Protect <strong>SCD2<\/strong> or any long-history tables from accidental resets, while still allowing normal incremental updates to flow. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/dlt\/transform?utm_source=chatgpt.com\">Databricks Documentation<\/a>)<\/p>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h1 class=\"wp-block-heading\">Scheduling DLT pipelines with <strong>Workflows<\/strong><\/h1>\n\n\n\n<p>You can orchestrate DLT with <strong>Workflows<\/strong> (Jobs). Two common triggers:<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li><strong>Schedule<\/strong> (cron-like): run every N minutes\/hours\/days.<\/li>\n\n\n\n<li><strong>File arrival<\/strong>: run when a new file lands in a <strong>Unity Catalog volume or external location<\/strong> (recommended with Autoloader pipelines). (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/jobs\/file-arrival-triggers?utm_source=chatgpt.com\">Databricks Documentation<\/a>, <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/databricks\/jobs\/file-arrival-triggers?utm_source=chatgpt.com\">Microsoft Learn<\/a>)<\/li>\n<\/ul>\n\n\n\n<h3 class=\"wp-block-heading\">A. Add a <strong>file-arrival trigger<\/strong> (UI steps)<\/h3>\n\n\n\n<ol class=\"wp-block-list\">\n<li>In the left sidebar, open <strong>Jobs &amp; Pipelines<\/strong> \u2192 open your <strong>Job<\/strong> (or create a Job and add a <strong>Pipeline<\/strong> task that points to your DLT pipeline).<\/li>\n\n\n\n<li>In the job\u2019s right panel, click <strong>Add trigger<\/strong> \u2192 choose <strong>File arrival<\/strong>.<\/li>\n\n\n\n<li><strong>Storage location:<\/strong> paste the <strong>UC volume<\/strong> or <strong>external location<\/strong> URL you want to watch (e.g., <code>uc:\/\/catalog.schema.volume\/path\/<\/code> or <code>abfss:\/\/\u2026<\/code>).<\/li>\n\n\n\n<li>(Optional) Configure <strong>polling interval<\/strong> and <strong>debounce<\/strong>\/\u201cwait after last change\u201d advanced options.<\/li>\n\n\n\n<li><strong>Save<\/strong>. The job will run when new files appear. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/jobs\/file-arrival-triggers?utm_source=chatgpt.com\">Databricks Documentation<\/a>)<\/li>\n<\/ol>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\">\n<p>Why volumes\/external locations? File-arrival triggers explicitly support <strong>Unity Catalog volumes\/external locations<\/strong> for secure, first-class monitoring. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/jobs\/file-arrival-triggers?utm_source=chatgpt.com\">Databricks Documentation<\/a>)<\/p>\n<\/blockquote>\n\n\n\n<h3 class=\"wp-block-heading\">B. Add a <strong>time-based schedule<\/strong><\/h3>\n\n\n\n<ol class=\"wp-block-list\">\n<li>In the same <strong>Add trigger<\/strong> dialog, choose <strong>Schedule<\/strong>.<\/li>\n\n\n\n<li>Set frequency (e.g., every hour at :15).<\/li>\n\n\n\n<li><strong>Save<\/strong>.<\/li>\n<\/ol>\n\n\n\n<h3 class=\"wp-block-heading\">C. Run in <strong>Production<\/strong> mode<\/h3>\n\n\n\n<p>Switch your DLT pipeline to <strong>Production<\/strong> so the job <strong>auto-terminates<\/strong> compute after each successful\/failed run (lower cost). This is the recommended mode for scheduled or file-arrival-triggered pipelines.<\/p>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h2 class=\"wp-block-heading\">End-to-end example: a one-task Workflow that runs your DLT pipeline<\/h2>\n\n\n\n<ul class=\"wp-block-list\">\n<li><strong>Task type:<\/strong> Pipeline<\/li>\n\n\n\n<li><strong>Pipeline:<\/strong> select your DLT pipeline (the one with <code>orders_bronze<\/code>\/<code>orders_silver<\/code>).<\/li>\n\n\n\n<li><strong>Trigger:<\/strong> add <strong>File arrival<\/strong> on the same folder your Autoloader or ingest job watches (or a <strong>Schedule<\/strong>).<\/li>\n\n\n\n<li><strong>(Optional)<\/strong> Check <strong>Full refresh<\/strong> on demand when you really want to recompute everything; otherwise rely on incremental updates. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/jobs\/file-arrival-triggers?utm_source=chatgpt.com\">Databricks Documentation<\/a>)<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h2 class=\"wp-block-heading\">Helpful SQL: identify full-refresh runs &amp; basic observability<\/h2>\n\n\n\n<ul class=\"wp-block-list\">\n<li>DLT exposes an <strong><code>event_log()<\/code><\/strong> table-valued function. You can create views and dashboards on top of it. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/sql\/language-manual\/functions\/event_log?utm_source=chatgpt.com\">Databricks Documentation<\/a>, <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/databricks\/sql\/language-manual\/functions\/event_log?utm_source=chatgpt.com\">Microsoft Learn<\/a>)<\/li>\n<\/ul>\n\n\n\n<pre class=\"wp-block-code\"><code>-- All events\nSELECT * FROM event_log('&lt;PIPELINE_ID&gt;');\n\n-- Was the last run a full refresh?\nSELECT\n  MAX(CASE WHEN details:full_refresh::boolean THEN 1 ELSE 0 END) AS was_full_refresh,\n  MAX(timestamp) AS last_event_time\nFROM event_log('&lt;PIPELINE_ID&gt;');\n<\/code><\/pre>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\">\n<p>Tip: The <strong>event log<\/strong> contains throughput, rule failures, and status\u2014great for <strong>DLT health<\/strong> and SLA reporting. (<a href=\"https:\/\/community.databricks.com\/t5\/data-engineering\/identifying-full-refresh-vs-incremental-runs-in-delta-live\/td-p\/106754?utm_source=chatgpt.com\">community.databricks.com<\/a>)<\/p>\n<\/blockquote>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h2 class=\"wp-block-heading\">Tested patterns &amp; gotchas<\/h2>\n\n\n\n<ul class=\"wp-block-list\">\n<li><strong>Truncate-Load sources:<\/strong> set <code>.option(\"skipChangeCommits\",\"true\")<\/code> on the <strong>streaming<\/strong> read of a Delta source that is not strictly append-only. This allows you to keep streaming even when the source truncates. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/structured-streaming\/delta-lake?utm_source=chatgpt.com\">Databricks Documentation<\/a>, <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/databricks\/structured-streaming\/delta-lake?utm_source=chatgpt.com\">Microsoft Learn<\/a>)<\/li>\n\n\n\n<li><strong>Protect SCD2\/history tables:<\/strong> add <code>pipelines.reset.allowed=false<\/code> at the <strong>table<\/strong> to prevent accidental full refresh. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/dlt\/properties?utm_source=chatgpt.com\">Databricks Documentation<\/a>)<\/li>\n\n\n\n<li><strong>File-arrival triggers<\/strong> require a <strong>UC volume\/external location<\/strong> path; they poll on a cadence you control. Combine with <strong>Production<\/strong> mode so compute auto-shuts down. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/jobs\/file-arrival-triggers?utm_source=chatgpt.com\">Databricks Documentation<\/a>)<\/li>\n\n\n\n<li><strong>Dashboards\/alerts:<\/strong> put a simple query over <code>event_log('&lt;PIPELINE_ID>')<\/code> and alert if failures spike or if no successful run happens within your SLO window. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/sql\/language-manual\/functions\/event_log?utm_source=chatgpt.com\">Databricks Documentation<\/a>)<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h2 class=\"wp-block-heading\">Quick copy-paste: minimal pipeline (Python)<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>import dlt\nfrom pyspark.sql import functions as F\n\n# ----- Bronze from truncate-load source -----\n@dlt.table(\n    name=\"orders_bronze\",\n    comment=\"Bronze from truncate-load source\"\n)\ndef orders_bronze():\n    return (spark.readStream\n                 .format(\"delta\")\n                 .option(\"skipChangeCommits\", \"true\")   # key for truncate-load\n                 .table(\"source.orders_raw\"))\n\n# ----- Silver protected from full refresh -----\n@dlt.table(\n    name=\"orders_silver\",\n    table_properties={\"pipelines.reset.allowed\": \"false\"},\n    comment=\"Silver table protected from full refresh\"\n)\ndef orders_silver():\n    df = dlt.read(\"orders_bronze\")\n    return (df\n            .withColumn(\"order_price_usd\", F.col(\"order_price\").cast(\"double\"))\n            .filter(F.col(\"order_price_usd\") &gt; 0))\n<\/code><\/pre>\n\n\n\n<p>Then wire this pipeline into a <strong>Workflow<\/strong> (Job) with either a <strong>Schedule<\/strong> or <strong>File-arrival trigger<\/strong> pointing at your landing location.<\/p>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<h3 class=\"wp-block-heading\">References<\/h3>\n\n\n\n<ul class=\"wp-block-list\">\n<li>DLT <strong>table properties<\/strong> (<code>pipelines.reset.allowed<\/code>, auto-optimize, etc.). (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/dlt\/properties?utm_source=chatgpt.com\">Databricks Documentation<\/a>, <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/databricks\/dlt\/properties?utm_source=chatgpt.com\">Microsoft Learn<\/a>)<\/li>\n\n\n\n<li><strong>Run\/update pipelines<\/strong> &amp; preventing full refresh via table property. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/dlt\/updates?utm_source=chatgpt.com\">Databricks Documentation<\/a>, <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/databricks\/dlt\/updates?utm_source=chatgpt.com\">Microsoft Learn<\/a>)<\/li>\n\n\n\n<li><strong>Delta streaming<\/strong>: <code>skipChangeCommits<\/code> option for streaming reads. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/structured-streaming\/delta-lake?utm_source=chatgpt.com\">Databricks Documentation<\/a>, <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/databricks\/structured-streaming\/delta-lake?utm_source=chatgpt.com\">Microsoft Learn<\/a>)<\/li>\n\n\n\n<li><strong>File-arrival triggers<\/strong> in Workflows (Volumes \/ External Locations). (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/jobs\/file-arrival-triggers?utm_source=chatgpt.com\">Databricks Documentation<\/a>, <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/databricks\/jobs\/file-arrival-triggers?utm_source=chatgpt.com\">Microsoft Learn<\/a>)<\/li>\n\n\n\n<li><strong>Event log<\/strong> TVF for observability. (<a href=\"https:\/\/docs.databricks.com\/aws\/en\/sql\/language-manual\/functions\/event_log?utm_source=chatgpt.com\">Databricks Documentation<\/a>, <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/databricks\/sql\/language-manual\/functions\/event_log?utm_source=chatgpt.com\">Microsoft Learn<\/a>)<\/li>\n<\/ul>\n\n\n\n<hr class=\"wp-block-separator has-alpha-channel-opacity\"\/>\n\n\n\n<p><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Introduction Today we\u2019ll cover four production patterns for Delta Live Tables (DLT): Truncate-Load table as Source for Streaming Tables (with skipChangeCommits) Problem: Your upstream system truncates a&#8230; <\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-836","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/posts\/836","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/comments?post=836"}],"version-history":[{"count":1,"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/posts\/836\/revisions"}],"predecessor-version":[{"id":837,"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/posts\/836\/revisions\/837"}],"wp:attachment":[{"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/media?parent=836"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/categories?post=836"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/dataopsschool.com\/blog\/wp-json\/wp\/v2\/tags?post=836"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}