-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
mysql
Versions of Apache Airflow Providers
apache-airflow-providers-amazon[aiobotocore]==8.17.0
apache-airflow-providers-apprise==1.2.1
apache-airflow-providers-atlassian-jira==1.1.0
apache-airflow-providers-celery==3.5.2
apache-airflow-providers-common-sql==1.10.1
apache-airflow-providers-ftp==3.7.0
apache-airflow-providers-google==10.14.0
apache-airflow-providers-http==4.9.0
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-microsoft-mssql==3.6.0
apache-airflow-providers-mysql==5.5.2
apache-airflow-providers-postgres==5.10.0
apache-airflow-providers-sftp==4.8.1
apache-airflow-providers-snowflake==5.3.0
apache-airflow-providers-sqlite==3.7.0
apache-airflow-providers-ssh==3.10.0
apache-airflow-providers-tableau==4.4.1
Apache Airflow version
2.7.2
Operating System
Amazon Linux 2023
Deployment
Amazon (AWS) MWAA
Deployment details
Python 3.11
What happened
Using the below task always fails due to a MySQL syntax error as the "IGNORE" Duplicate-Key and Error Handling option and any mysql_extra_options values are escaped before execution.
load_data= S3ToMySqlOperator(
task_id="load_data",
s3_source_key=f"s3://{S3_BRONZE_BUCKET}/test.csv",
mysql_table="test_db.test_table",
mysql_conn_id=MYSQL_CONN_ID,
mysql_extra_options="FIELDS TERMINATED BY ',' (col1, col3)",
mysql_local_infile=True,
)
What you think should happen instead
The task instance fails with the following error:
MySQLdb.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ''IGNORE' INTO TABLE test_db.test_table 'FIELDS TERMINATED BY \\',\\' (col1,' at line 1")
How to reproduce
In a MySQL environment with local-infile enabled:
CREATE TABLE test_db.test_table (
col1 INT NULL,
col2 INT NULL,
col3 INT NULL
);
test.csv (create and upload to "test_bucket"):
100,200
300,400
500,600
example_dag.py
"""An example dag."""
from __future__ import annotations
from pathlib import Path
import pendulum
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
MYSQL_CONN_ID = "mysql_server"
S3_BUCKET = "test_bucket"
with DAG(
dag_id=Path(__file__).name.replace(".py", ""),
start_date=pendulum.datetime(2022, 1, 1, tz="Europe/London"),
schedule=None,
):
truncate_test_table = SQLExecuteQueryOperator(
task_id="truncate_test_table",
sql="TRUNCATE TABLE test_db.test_table",
autocommit=True,
conn_id=MYSQL_CONN_ID,
database="wallboard",
)
load_data= S3ToMySqlOperator(
task_id="load_data",
s3_source_key=f"s3://{S3_BUCKET}/test.csv",
mysql_table="test_db.test_table",
mysql_conn_id=MYSQL_CONN_ID,
mysql_extra_options="FIELDS TERMINATED BY ',' (col1, col3)",
mysql_local_infile=True,
)
truncate_test_table >> load_data
Running the DAG should result in an error of
MySQLdb.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ''IGNORE' INTO TABLE test_db.test_table 'FIELDS TERMINATED BY \\',\\' (col1,' at line 1")
Anything else
Occurs every time because IGNORE is passed as default and gets escaped incorrectly.
I believe it was caused by this PR #33328 which added the escapes to prevent sql injection but seems to have broken the bulk_load_custom() method,
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct