Rajesh Kumar August 23, 2025 0


1. 🔹 What is COPY INTO?

  • COPY INTO is a Databricks SQL command to load data files into Delta tables.
  • Supported formats: CSV, JSON, Avro, Parquet, ORC, text, binary.
  • Key benefits:
    • Idempotent → Once a file is loaded, re-running COPY INTO will not reload it.
    • Exactly once semantics → Files are ingested only once, even across retries.
    • ✅ Handles schema inference & evolution.
    • ✅ Scalable for thousands of files.

👉 For millions of files or complex directories, use Autoloader instead.


2. 🔹 Setup: Managed Volume & Input Files

-- Create managed volume
CREATE VOLUME dev.bronze.landing;

-- Create input folder
%python
dbutils.fs.mkdirs("dbfs:/Volumes/dev/bronze/landing/input")

-- Copy sample files
dbutils.fs.cp("dbfs:/databricks-datasets/retail-org/invoices/2021-01.csv",
              "dbfs:/Volumes/dev/bronze/landing/input/", recurse=True)

dbutils.fs.cp("dbfs:/databricks-datasets/retail-org/invoices/2021-02.csv",
              "dbfs:/Volumes/dev/bronze/landing/input/", recurse=True)

Now we have two invoice CSV files ready in /landing/input.


3. 🔹 Placeholder Delta Table

We can create a table without schema → COPY INTO will infer columns automatically.

CREATE TABLE dev.bronze.invoice_cp;

This is an empty Delta table with no defined schema.


4. 🔹 COPY INTO Command

COPY INTO dev.bronze.invoice_cp
FROM 'dbfs:/Volumes/dev/bronze/landing/input/'
FILEFORMAT = CSV
PATTERN = '*.csv'
FORMAT_OPTIONS ('mergeSchema' = 'true', 'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

✅ Loads all CSV files once.
✅ If rerun, skipped (because of COPY INTO metadata tracking).


5. 🔹 How Metadata is Tracked

  • COPY INTO maintains logs in the Delta table’s _delta_log/copy-into-log directory.
  • Each ingested file path + checksum is recorded.
  • On rerun → Skips already-processed files.

Check metadata:

DESCRIBE EXTENDED dev.bronze.invoice_cp;

Look in storage → _delta_log/copy-into-log.


6. 🔹 Transforming Data While Loading

You can transform/select columns during ingestion.

Example: Create table with selected columns

CREATE TABLE dev.bronze.invoice_alt (
  invoice_number STRING,
  stock_code STRING,
  quantity DOUBLE,
  insert_date TIMESTAMP
);

COPY INTO with transformations

COPY INTO dev.bronze.invoice_alt
FROM (
  SELECT 
    InvoiceNo as invoice_number,
    StockCode as stock_code,
    CAST(Quantity AS DOUBLE) as quantity,
    current_timestamp() as insert_date
  FROM 'dbfs:/Volumes/dev/bronze/landing/input/'
)
FILEFORMAT = CSV
PATTERN = '*.csv'
FORMAT_OPTIONS ('header' = 'true');

7. 🔹 Incremental Loads

If you add new files (2021-03.csv), COPY INTO only ingests the new file:

dbutils.fs.cp("dbfs:/databricks-datasets/retail-org/invoices/2021-03.csv",
              "dbfs:/Volumes/dev/bronze/landing/input/", recurse=True)

Then rerun COPY INTO:

COPY INTO dev.bronze.invoice_alt
FROM 'dbfs:/Volumes/dev/bronze/landing/input/'
FILEFORMAT = CSV
PATTERN = '*.csv'
FORMAT_OPTIONS ('header' = 'true');

👉 Only the new file’s rows will be inserted.


8. 🔹 Best Practices

  • Use placeholder table + mergeSchema if schema unknown.
  • For production, prefer explicit schema (better governance).
  • Keep landing zones clean → Avoid re-copying old files.
  • For large scale pipelines: switch to Autoloader.

✅ Summary

  • COPY INTO = Reliable, idempotent, exactly-once ingestion.
  • Maintains metadata in Delta logs.
  • Supports schema inference, schema evolution, and transformations.
  • Ideal for batch pipelines with manageable file counts.
  • For huge scale, prefer Autoloader.

Category: