Skip to content

DatabricksCopyIntoOperator allows mutation of COPY INTO statement via unvalidated table_name #62498

@SameerMesiah97

Description

@SameerMesiah97

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

apache-airflow-providers-databricks >= 7.9.1

Apache Airflow version

main

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Other

Deployment details

No response

What happened

DatabricksCopyIntoOperator directly interpolates table_name into the generated COPY INTO SQL statement without validating that it is a proper identifier.

When table_name is templated from dag_run.conf, it is possible to supply a value that mutates the intended COPY INTO statement into multiple SQL statements.

For example, passing a value containing ; DROP TABLE ... results in SQL that no longer represents a single structured COPY INTO statement.

What you think should happen instead

DatabricksCopyIntoOperator should validate table_name as a proper SQL identifier (e.g., table, schema.table, or catalog.schema.table).

If an invalid identifier or multi-statement fragment is supplied, the operator should raise a clear exception during SQL construction rather than generating mutated SQL.

This preserves the structured semantics of the operator. Users requiring arbitrary SQL execution should instead use DatabricksSqlOperator.

How to reproduce

  1. Ensure you have a valid databricks_default connection configured in Airflow.

  2. Create the following DAG:

from airflow import DAG
from airflow.providers.databricks.operators.databricks_sql import DatabricksCopyIntoOperator

with DAG(
    dag_id="databricks_copy_into_sql_injection_repro",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:

    copy_into = DatabricksCopyIntoOperator(
        task_id="copy_into_task",
        table_name="{{ dag_run.conf.get('table_name', 'workspace.default.safe_table') }}",
        file_location="s3://dummy/path",
        file_format="CSV",
        databricks_conn_id="databricks_default",
    )
  1. Trigger the DAG with the following config:

{"table_name": "workspace.default.safe_table; DROP TABLE workspace.default.other_table; --"}

Observed Behaviour

The operator constructs SQL containing the injected fragment as part of the statement. Depending on warehouse configuration, this may result in malformed SQL errors or unintended execution of additional statements.

Anything else

While Airflow assumes trusted DAG authors, DatabricksCopyIntoOperator is intended to be a structured abstraction around COPY INTO, not a general-purpose SQL executor.

Validating identifier fields such as table_name would prevent mutation of the intended SQL structure and align the operator’s behavior with its purpose.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions