-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
snowflake
Versions of Apache Airflow Providers
This started with apache-airflow-providers-snowflake==4.0.0 and is still an issue with 4.0.4
Apache Airflow version
2.5.1
Operating System
Debian GNU/Linux 11 (bullseye)
Deployment
Astronomer
Deployment details
This is affecting both local and hosted deployments
What happened
We are testing out several updated packages, and one thing that broke was the SnowflakeOperator when it was executing a stored procedure. The specific error points to autocommit being set to False:
Stored procedure execution error: Scoped transaction started in stored procedure is incomplete and it was rolled back.
Whereas this used to work in version 3.2.0:
copy_data_snowflake = SnowflakeOperator(
task_id=f'copy_{table_name}_snowflake',
sql=query,
)
In order for it to work now, we have to specify autocommit=True:
copy_data_snowflake = SnowflakeOperator(
task_id=f'copy_{table_name}_snowflake',
sql=query,
autocommit=True,
)
The code still indicates that the default is True, but I believe this commit broke it.
What you think should happen instead
The default for autocommit should revert to the previous behavior, matching the documentation.
How to reproduce
In Snowflake:
CREATE OR REPLACE TABLE PUBLIC.FOO (BAR VARCHAR);
CREATE OR REPLACE PROCEDURE PUBLIC.FOO()
RETURNS VARCHAR
LANGUAGE SQL
AS $$
INSERT INTO PUBLIC.FOO VALUES('bar');
$$
;
In Airflow, this fails:
copy_data_snowflake = SnowflakeOperator(
task_id='call_foo',
sql="call public.foo()",
)
But this succeeds:
copy_data_snowflake = SnowflakeOperator(
task_id='call_foo',
sql="call public.foo()",
autocommit=True,
)
Anything else
It looks like this may be an issue with stored procedures specifically. If I instead do this:
copy_data_snowflake = SnowflakeOperator(
task_id='call_foo',
sql="INSERT INTO PUBLIC.FOO VALUES('bar');",
)
The logs show that although autocommit is confusingly set to False, a COMMIT statement is executed:
[2023-03-09, 18:43:09 CST] {cursor.py:727} INFO - query: [ALTER SESSION SET autocommit=False]
[2023-03-09, 18:43:09 CST] {cursor.py:740} INFO - query execution done
[2023-03-09, 18:43:09 CST] {cursor.py:878} INFO - Number of results in first chunk: 1
[2023-03-09, 18:43:09 CST] {sql.py:375} INFO - Running statement: INSERT INTO PUBLIC.FOO VALUES('bar');, parameters: None
[2023-03-09, 18:43:09 CST] {cursor.py:727} INFO - query: [INSERT INTO PUBLIC.FOO VALUES('bar');]
[2023-03-09, 18:43:09 CST] {cursor.py:740} INFO - query execution done
[2023-03-09, 18:43:09 CST] {sql.py:384} INFO - Rows affected: 1
[2023-03-09, 18:43:09 CST] {snowflake.py:380} INFO - Rows affected: 1
[2023-03-09, 18:43:09 CST] {snowflake.py:381} INFO - Snowflake query id: 01aad76b-0606-feb5-0000-26b511d0ba02
[2023-03-09, 18:43:09 CST] {cursor.py:727} INFO - query: [COMMIT]
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct