Databricks: Databricks COPY INTO Command – Idempotent & Exactly-Once Data Loading


1. πŸ”Ή What is COPY INTO?

  • COPY INTO is a Databricks SQL command to load data files into Delta tables.
  • Supported formats: CSV, JSON, Avro, Parquet, ORC, text, binary.
  • Key benefits:
    • βœ… Idempotent β†’ Once a file is loaded, re-running COPY INTO will not reload it.
    • βœ… Exactly once semantics β†’ Files are ingested only once, even across retries.
    • βœ… Handles schema inference & evolution.
    • βœ… Scalable for thousands of files.

πŸ‘‰ For millions of files or complex directories, use Autoloader instead.


2. πŸ”Ή Setup: Managed Volume & Input Files

-- Create managed volume
CREATE VOLUME dev.bronze.landing;

-- Create input folder
%python
dbutils.fs.mkdirs("dbfs:/Volumes/dev/bronze/landing/input")

-- Copy sample files
dbutils.fs.cp("dbfs:/databricks-datasets/retail-org/invoices/2021-01.csv",
              "dbfs:/Volumes/dev/bronze/landing/input/", recurse=True)

dbutils.fs.cp("dbfs:/databricks-datasets/retail-org/invoices/2021-02.csv",
              "dbfs:/Volumes/dev/bronze/landing/input/", recurse=True)

Now we have two invoice CSV files ready in /landing/input.


3. πŸ”Ή Placeholder Delta Table

We can create a table without schema β†’ COPY INTO will infer columns automatically.

CREATE TABLE dev.bronze.invoice_cp;

This is an empty Delta table with no defined schema.


4. πŸ”Ή COPY INTO Command

COPY INTO dev.bronze.invoice_cp
FROM 'dbfs:/Volumes/dev/bronze/landing/input/'
FILEFORMAT = CSV
PATTERN = '*.csv'
FORMAT_OPTIONS ('mergeSchema' = 'true', 'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

βœ… Loads all CSV files once.
βœ… If rerun, skipped (because of COPY INTO metadata tracking).


5. πŸ”Ή How Metadata is Tracked

  • COPY INTO maintains logs in the Delta table’s _delta_log/copy-into-log directory.
  • Each ingested file path + checksum is recorded.
  • On rerun β†’ Skips already-processed files.

Check metadata:

DESCRIBE EXTENDED dev.bronze.invoice_cp;

Look in storage β†’ _delta_log/copy-into-log.


6. πŸ”Ή Transforming Data While Loading

You can transform/select columns during ingestion.

Example: Create table with selected columns

CREATE TABLE dev.bronze.invoice_alt (
  invoice_number STRING,
  stock_code STRING,
  quantity DOUBLE,
  insert_date TIMESTAMP
);

COPY INTO with transformations

COPY INTO dev.bronze.invoice_alt
FROM (
  SELECT 
    InvoiceNo as invoice_number,
    StockCode as stock_code,
    CAST(Quantity AS DOUBLE) as quantity,
    current_timestamp() as insert_date
  FROM 'dbfs:/Volumes/dev/bronze/landing/input/'
)
FILEFORMAT = CSV
PATTERN = '*.csv'
FORMAT_OPTIONS ('header' = 'true');

7. πŸ”Ή Incremental Loads

If you add new files (2021-03.csv), COPY INTO only ingests the new file:

dbutils.fs.cp("dbfs:/databricks-datasets/retail-org/invoices/2021-03.csv",
              "dbfs:/Volumes/dev/bronze/landing/input/", recurse=True)

Then rerun COPY INTO:

COPY INTO dev.bronze.invoice_alt
FROM 'dbfs:/Volumes/dev/bronze/landing/input/'
FILEFORMAT = CSV
PATTERN = '*.csv'
FORMAT_OPTIONS ('header' = 'true');

πŸ‘‰ Only the new file’s rows will be inserted.


8. πŸ”Ή Best Practices

  • Use placeholder table + mergeSchema if schema unknown.
  • For production, prefer explicit schema (better governance).
  • Keep landing zones clean β†’ Avoid re-copying old files.
  • For large scale pipelines: switch to Autoloader.

βœ… Summary

  • COPY INTO = Reliable, idempotent, exactly-once ingestion.
  • Maintains metadata in Delta logs.
  • Supports schema inference, schema evolution, and transformations.
  • Ideal for batch pipelines with manageable file counts.
  • For huge scale, prefer Autoloader.