Skip to content

MySqlHook bulk_load_custom() syntax errors due to escaped sql command #42061

@jjunkiee

Description

@jjunkiee

Apache Airflow Provider(s)

mysql

Versions of Apache Airflow Providers

apache-airflow-providers-amazon[aiobotocore]==8.17.0
apache-airflow-providers-apprise==1.2.1
apache-airflow-providers-atlassian-jira==1.1.0
apache-airflow-providers-celery==3.5.2
apache-airflow-providers-common-sql==1.10.1
apache-airflow-providers-ftp==3.7.0
apache-airflow-providers-google==10.14.0
apache-airflow-providers-http==4.9.0
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-microsoft-mssql==3.6.0
apache-airflow-providers-mysql==5.5.2
apache-airflow-providers-postgres==5.10.0
apache-airflow-providers-sftp==4.8.1
apache-airflow-providers-snowflake==5.3.0
apache-airflow-providers-sqlite==3.7.0
apache-airflow-providers-ssh==3.10.0
apache-airflow-providers-tableau==4.4.1

Apache Airflow version

2.7.2

Operating System

Amazon Linux 2023

Deployment

Amazon (AWS) MWAA

Deployment details

Python 3.11

What happened

Using the below task always fails due to a MySQL syntax error as the "IGNORE" Duplicate-Key and Error Handling option and any mysql_extra_options values are escaped before execution.

load_data= S3ToMySqlOperator(
  task_id="load_data",
  s3_source_key=f"s3://{S3_BRONZE_BUCKET}/test.csv",
  mysql_table="test_db.test_table",
  mysql_conn_id=MYSQL_CONN_ID,
  mysql_extra_options="FIELDS TERMINATED BY ',' (col1, col3)",
  mysql_local_infile=True,
)

What you think should happen instead

The task instance fails with the following error:
MySQLdb.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ''IGNORE' INTO TABLE test_db.test_table 'FIELDS TERMINATED BY \\',\\' (col1,' at line 1")

How to reproduce

In a MySQL environment with local-infile enabled:

CREATE TABLE test_db.test_table (
	col1 INT NULL,
	col2 INT NULL,
	col3 INT NULL
);

test.csv (create and upload to "test_bucket"):

100,200
300,400
500,600

example_dag.py

"""An example dag."""

from __future__ import annotations

from pathlib import Path

import pendulum

from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator

MYSQL_CONN_ID = "mysql_server"
S3_BUCKET = "test_bucket"

with DAG(
    dag_id=Path(__file__).name.replace(".py", ""),
    start_date=pendulum.datetime(2022, 1, 1, tz="Europe/London"),
    schedule=None,
):
    truncate_test_table = SQLExecuteQueryOperator(
        task_id="truncate_test_table",
        sql="TRUNCATE TABLE test_db.test_table",
        autocommit=True,
        conn_id=MYSQL_CONN_ID,
        database="wallboard",
    )

    load_data= S3ToMySqlOperator(
        task_id="load_data",
        s3_source_key=f"s3://{S3_BUCKET}/test.csv",
        mysql_table="test_db.test_table",
        mysql_conn_id=MYSQL_CONN_ID,
        mysql_extra_options="FIELDS TERMINATED BY ',' (col1, col3)",
        mysql_local_infile=True,
    )

    truncate_test_table >> load_data

Running the DAG should result in an error of
MySQLdb.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ''IGNORE' INTO TABLE test_db.test_table 'FIELDS TERMINATED BY \\',\\' (col1,' at line 1")

Anything else

Occurs every time because IGNORE is passed as default and gets escaped incorrectly.

I believe it was caused by this PR #33328 which added the escapes to prevent sql injection but seems to have broken the bulk_load_custom() method,

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions