Skip to content

SnowflakeOperator default autocommit flipped to False #30010

@joeknize-bc

Description

@joeknize-bc

Apache Airflow Provider(s)

snowflake

Versions of Apache Airflow Providers

This started with apache-airflow-providers-snowflake==4.0.0 and is still an issue with 4.0.4

Apache Airflow version

2.5.1

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Astronomer

Deployment details

This is affecting both local and hosted deployments

What happened

We are testing out several updated packages, and one thing that broke was the SnowflakeOperator when it was executing a stored procedure. The specific error points to autocommit being set to False:
Stored procedure execution error: Scoped transaction started in stored procedure is incomplete and it was rolled back.

Whereas this used to work in version 3.2.0:

    copy_data_snowflake = SnowflakeOperator(
        task_id=f'copy_{table_name}_snowflake',
        sql=query,
    )

In order for it to work now, we have to specify autocommit=True:

    copy_data_snowflake = SnowflakeOperator(
        task_id=f'copy_{table_name}_snowflake',
        sql=query,
        autocommit=True,
    )

The code still indicates that the default is True, but I believe this commit broke it.

What you think should happen instead

The default for autocommit should revert to the previous behavior, matching the documentation.

How to reproduce

In Snowflake:

CREATE OR REPLACE TABLE PUBLIC.FOO (BAR VARCHAR);
CREATE OR REPLACE PROCEDURE PUBLIC.FOO()
RETURNS VARCHAR
LANGUAGE SQL
AS $$
    INSERT INTO PUBLIC.FOO VALUES('bar');
$$
;

In Airflow, this fails:

    copy_data_snowflake = SnowflakeOperator(
        task_id='call_foo',
        sql="call public.foo()",
    )

But this succeeds:

    copy_data_snowflake = SnowflakeOperator(
        task_id='call_foo',
        sql="call public.foo()",
        autocommit=True,
    )

Anything else

It looks like this may be an issue with stored procedures specifically. If I instead do this:

    copy_data_snowflake = SnowflakeOperator(
        task_id='call_foo',
        sql="INSERT INTO PUBLIC.FOO VALUES('bar');",
    )

The logs show that although autocommit is confusingly set to False, a COMMIT statement is executed:

[2023-03-09, 18:43:09 CST] {cursor.py:727} INFO - query: [ALTER SESSION SET autocommit=False]
[2023-03-09, 18:43:09 CST] {cursor.py:740} INFO - query execution done
[2023-03-09, 18:43:09 CST] {cursor.py:878} INFO - Number of results in first chunk: 1
[2023-03-09, 18:43:09 CST] {sql.py:375} INFO - Running statement: INSERT INTO PUBLIC.FOO VALUES('bar');, parameters: None
[2023-03-09, 18:43:09 CST] {cursor.py:727} INFO - query: [INSERT INTO PUBLIC.FOO VALUES('bar');]
[2023-03-09, 18:43:09 CST] {cursor.py:740} INFO - query execution done
[2023-03-09, 18:43:09 CST] {sql.py:384} INFO - Rows affected: 1
[2023-03-09, 18:43:09 CST] {snowflake.py:380} INFO - Rows affected: 1
[2023-03-09, 18:43:09 CST] {snowflake.py:381} INFO - Snowflake query id: 01aad76b-0606-feb5-0000-26b511d0ba02
[2023-03-09, 18:43:09 CST] {cursor.py:727} INFO - query: [COMMIT]

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions