Databricks: Truncate-and-Load as a streaming source, Full Refresh of a DLT pipeline, Workflow file-arrival triggers


Introduction

Today we’ll cover four production patterns for Delta Live Tables (DLT):

  1. Truncate-and-Load as a streaming source (when the upstream source truncates then reloads each batch).
  2. Full Refresh of a DLT pipeline—and how to protect key tables from being reset.
  3. Workflow file-arrival triggers to run DLT as soon as new files land.
  4. Scheduling DLT (cron-style) and production mode behavior.

Truncate-Load table as Source for Streaming Tables (with skipChangeCommits)

Problem: Your upstream system truncates a Delta table and then inserts new data. A “pure” streaming read sees this as non-append changes and will fail.

Fix: When reading a Delta table as a streaming source, set:

DLT (Python) example

import dlt
from pyspark.sql import functions as F

# Truncate-load source path or table
SOURCE_TABLE = "source.orders_raw"   # could also be a UC path like catalog.schema.table

@dlt.table(
    name="orders_bronze",
    comment="Bronze from truncate-load source using skipChangeCommits."
)
def orders_bronze():
    # If you read by table name:
    return (spark.readStream
                 .format("delta")
                 .option("skipChangeCommits", "true")  # KEY!
                 .table(SOURCE_TABLE))
    # Or if you read by path:
    # return (spark.readStream.format("delta")
    #         .option("skipChangeCommits","true")
    #         .load("dbfs:/Volumes/dev/etl/landing/files/orders_raw"))

Notes

  • Works for Delta streaming sources that occasionally do non-append changes (like truncation).
  • Use with care: you’re choosing to ignore “change” history and always start from the latest snapshot. If you need change history, use CDC (readChangeFeed) instead. (Databricks Documentation)

Full Refresh (what it does) & how to avoid refreshing specific tables

Full refresh re-computes & re-loads all pipeline datasets from scratch; streaming tables may be truncated (history lost) before re-build. You can prevent full refresh on selected tables with a table property:

DLT (Python) decorator with table properties

@dlt.table(
    name="orders_silver",
    table_properties={ "pipelines.reset.allowed": "false" },  # protect from full refresh
    comment="Silver table protected from full refresh."
)
def orders_silver():
    df = dlt.read("orders_bronze")
    return df.select("order_key","order_status","order_price","cust_id")

DLT (SQL) equivalent

CREATE OR REFRESH STREAMING LIVE TABLE orders_silver_sql
TBLPROPERTIES (pipelines.reset.allowed = false)
AS
SELECT order_key, order_status, order_price, cust_id
FROM LIVE.orders_bronze;

When to use: Protect SCD2 or any long-history tables from accidental resets, while still allowing normal incremental updates to flow. (Databricks Documentation)


Scheduling DLT pipelines with Workflows

You can orchestrate DLT with Workflows (Jobs). Two common triggers:

  • Schedule (cron-like): run every N minutes/hours/days.
  • File arrival: run when a new file lands in a Unity Catalog volume or external location (recommended with Autoloader pipelines). (Databricks Documentation, Microsoft Learn)

A. Add a file-arrival trigger (UI steps)

  1. In the left sidebar, open Jobs & Pipelines → open your Job (or create a Job and add a Pipeline task that points to your DLT pipeline).
  2. In the job’s right panel, click Add trigger → choose File arrival.
  3. Storage location: paste the UC volume or external location URL you want to watch (e.g., uc://catalog.schema.volume/path/ or abfss://…).
  4. (Optional) Configure polling interval and debounce/“wait after last change” advanced options.
  5. Save. The job will run when new files appear. (Databricks Documentation)

Why volumes/external locations? File-arrival triggers explicitly support Unity Catalog volumes/external locations for secure, first-class monitoring. (Databricks Documentation)

B. Add a time-based schedule

  1. In the same Add trigger dialog, choose Schedule.
  2. Set frequency (e.g., every hour at :15).
  3. Save.

C. Run in Production mode

Switch your DLT pipeline to Production so the job auto-terminates compute after each successful/failed run (lower cost). This is the recommended mode for scheduled or file-arrival-triggered pipelines.


End-to-end example: a one-task Workflow that runs your DLT pipeline

  • Task type: Pipeline
  • Pipeline: select your DLT pipeline (the one with orders_bronze/orders_silver).
  • Trigger: add File arrival on the same folder your Autoloader or ingest job watches (or a Schedule).
  • (Optional) Check Full refresh on demand when you really want to recompute everything; otherwise rely on incremental updates. (Databricks Documentation)

Helpful SQL: identify full-refresh runs & basic observability

-- All events
SELECT * FROM event_log('<PIPELINE_ID>');

-- Was the last run a full refresh?
SELECT
  MAX(CASE WHEN details:full_refresh::boolean THEN 1 ELSE 0 END) AS was_full_refresh,
  MAX(timestamp) AS last_event_time
FROM event_log('<PIPELINE_ID>');

Tip: The event log contains throughput, rule failures, and status—great for DLT health and SLA reporting. (community.databricks.com)


Tested patterns & gotchas

  • Truncate-Load sources: set .option("skipChangeCommits","true") on the streaming read of a Delta source that is not strictly append-only. This allows you to keep streaming even when the source truncates. (Databricks Documentation, Microsoft Learn)
  • Protect SCD2/history tables: add pipelines.reset.allowed=false at the table to prevent accidental full refresh. (Databricks Documentation)
  • File-arrival triggers require a UC volume/external location path; they poll on a cadence you control. Combine with Production mode so compute auto-shuts down. (Databricks Documentation)
  • Dashboards/alerts: put a simple query over event_log('<PIPELINE_ID>') and alert if failures spike or if no successful run happens within your SLO window. (Databricks Documentation)

Quick copy-paste: minimal pipeline (Python)

import dlt
from pyspark.sql import functions as F

# ----- Bronze from truncate-load source -----
@dlt.table(
    name="orders_bronze",
    comment="Bronze from truncate-load source"
)
def orders_bronze():
    return (spark.readStream
                 .format("delta")
                 .option("skipChangeCommits", "true")   # key for truncate-load
                 .table("source.orders_raw"))

# ----- Silver protected from full refresh -----
@dlt.table(
    name="orders_silver",
    table_properties={"pipelines.reset.allowed": "false"},
    comment="Silver table protected from full refresh"
)
def orders_silver():
    df = dlt.read("orders_bronze")
    return (df
            .withColumn("order_price_usd", F.col("order_price").cast("double"))
            .filter(F.col("order_price_usd") > 0))

Then wire this pipeline into a Workflow (Job) with either a Schedule or File-arrival trigger pointing at your landing location.


References