-
Notifications
You must be signed in to change notification settings - Fork 15.5k
Improvements for SnowflakeHook.get_sqlalchemy_engine
#20509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
if conn_params['region'] == '': | ||
del conn_params['region'] | ||
uri = ( | ||
'snowflake://{user}:{password}@{account}/{database}/{schema}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if conn_params['region'] == '': | |
del conn_params['region'] | |
uri = ( | |
'snowflake://{user}:{password}@{account}/{database}/{schema}' | |
region = conn_params.get('region') | |
conn_params['region'] = f".{region}" if region else '' | |
uri = ( | |
'snowflake://{user}:{password}@{account}{region}/{database}/{schema}' |
@@ -388,3 +388,38 @@ def test_connection_failure(self, mock_run): | |||
assert status is False | |||
assert msg == 'Connection Errors' | |||
mock_run.assert_called_once_with(sql='select 1') | |||
|
|||
@mock.patch('airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run') | |||
def test_connection_without_region(self, mock_run): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add this case to the existing parameterized method?
airflow/tests/providers/snowflake/hooks/test_snowflake.py
Lines 75 to 203 in e0552c7
@pytest.mark.parametrize( | |
"connection_kwargs,expected_uri,expected_conn_params", | |
[ | |
( | |
BASE_CONNECTION_KWARGS, | |
( | |
'snowflake://user:pw@airflow.af_region/db/public?' | |
'warehouse=af_wh&role=af_role&authenticator=snowflake' | |
), | |
{ | |
'account': 'airflow', | |
'application': 'AIRFLOW', | |
'authenticator': 'snowflake', | |
'database': 'db', | |
'password': 'pw', | |
'region': 'af_region', | |
'role': 'af_role', | |
'schema': 'public', | |
'session_parameters': None, | |
'user': 'user', | |
'warehouse': 'af_wh', | |
}, | |
), | |
( | |
{ | |
**BASE_CONNECTION_KWARGS, | |
**{ | |
'extra': { | |
'extra__snowflake__database': 'db', | |
'extra__snowflake__account': 'airflow', | |
'extra__snowflake__warehouse': 'af_wh', | |
'extra__snowflake__region': 'af_region', | |
'extra__snowflake__role': 'af_role', | |
}, | |
}, | |
}, | |
( | |
'snowflake://user:pw@airflow.af_region/db/public?' | |
'warehouse=af_wh&role=af_role&authenticator=snowflake' | |
), | |
{ | |
'account': 'airflow', | |
'application': 'AIRFLOW', | |
'authenticator': 'snowflake', | |
'database': 'db', | |
'password': 'pw', | |
'region': 'af_region', | |
'role': 'af_role', | |
'schema': 'public', | |
'session_parameters': None, | |
'user': 'user', | |
'warehouse': 'af_wh', | |
}, | |
), | |
( | |
{ | |
**BASE_CONNECTION_KWARGS, | |
**{ | |
'extra': { | |
'extra__snowflake__database': 'db', | |
'extra__snowflake__account': 'airflow', | |
'extra__snowflake__warehouse': 'af_wh', | |
'extra__snowflake__region': 'af_region', | |
'extra__snowflake__role': 'af_role', | |
'extra__snowflake__insecure_mode': 'True', | |
}, | |
}, | |
}, | |
( | |
'snowflake://user:pw@airflow.af_region/db/public?' | |
'warehouse=af_wh&role=af_role&authenticator=snowflake' | |
), | |
{ | |
'account': 'airflow', | |
'application': 'AIRFLOW', | |
'authenticator': 'snowflake', | |
'database': 'db', | |
'password': 'pw', | |
'region': 'af_region', | |
'role': 'af_role', | |
'schema': 'public', | |
'session_parameters': None, | |
'user': 'user', | |
'warehouse': 'af_wh', | |
'insecure_mode': True, | |
}, | |
), | |
( | |
{ | |
**BASE_CONNECTION_KWARGS, | |
**{ | |
'extra': { | |
'extra__snowflake__database': 'db', | |
'extra__snowflake__account': 'airflow', | |
'extra__snowflake__warehouse': 'af_wh', | |
'extra__snowflake__region': 'af_region', | |
'extra__snowflake__role': 'af_role', | |
'extra__snowflake__insecure_mode': 'False', | |
} | |
}, | |
}, | |
( | |
'snowflake://user:pw@airflow.af_region/db/public?' | |
'warehouse=af_wh&role=af_role&authenticator=snowflake' | |
), | |
{ | |
'account': 'airflow', | |
'application': 'AIRFLOW', | |
'authenticator': 'snowflake', | |
'database': 'db', | |
'password': 'pw', | |
'region': 'af_region', | |
'role': 'af_role', | |
'schema': 'public', | |
'session_parameters': None, | |
'user': 'user', | |
'warehouse': 'af_wh', | |
}, | |
), | |
], | |
) | |
def test_hook_should_support_prepare_basic_conn_params_and_uri( | |
self, connection_kwargs, expected_uri, expected_conn_params | |
): | |
with unittest.mock.patch.dict( | |
'os.environ', AIRFLOW_CONN_TEST_CONN=Connection(**connection_kwargs).get_uri() | |
): | |
assert SnowflakeHook(snowflake_conn_id='test_conn').get_uri() == expected_uri | |
assert SnowflakeHook(snowflake_conn_id='test_conn')._get_conn_params() == expected_conn_params |
@@ -388,3 +388,38 @@ def test_connection_failure(self, mock_run): | |||
assert status is False | |||
assert msg == 'Connection Errors' | |||
mock_run.assert_called_once_with(sql='select 1') | |||
|
|||
@mock.patch('airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mik-laj Just saw that you modified the pytest connection parameters to allow a blank value for region. I attempted to write the same, but unsuccessfully, so added a new test. This is not needed, now that you have added the params.
@harishkrao I updated the code to use |
@mik-laj |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with the changes made by @mik-laj
SnowflakeHook.get_sqlalchemy_engine
I will include that one in the december release tool |
Awesome work, congrats on your first merged pull request! |
The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease. |
@potiuk @mik-laj @harishkrao FYI this is still broken in Airflow 2.2.3 |
I will take a look. Should I open this PR or create a new one to continue? @potiuk @mik-laj |
@harishkrao @NadimYounes What version of Snowflake provider are you using? This change has been released in apache-airflow-providers-snowflake v2.4.0. To check the version of the package, you can run the |
By default, Docker image and constraints files for Apache Airflow v2.3.3 uses snowflake-provider 2.2.3 Line 120 in 8a96274
Here is guide that explain how to upgrade airflow provider in docker image: https://airflow.apache.org/docs/docker-stack/build.html#example-of-upgrading-airflow-provider-packages |
@mik-laj Makes sense. Why is the latest Airflow constraints pointing to an older version of the providers? |
@NadimYounes Because it is only created once when a new version of Airflow is released. Only when we notice a critical problem, we sometimes make additional changes to it e.g. #20245 |
Longer answer: Often new providers introduce dependency changes that might or might not work with other versions of providers, the constraints are "golden" set of dependencies that we know that will work (can be installed and passes all the tests) - not only when we release Airflow, but only in the future when new dependencies are released. This is what we - as community commit to, and this is also what we maintain (and every new release of airlfow witll automatically have constratints with the latest released providers. Constraints are there to make sure "You can always install Airflow with those constraints and we guarantee they will work fine". For the newly released providers - we make a lot of work to make sure they work with previously released airflow versions - for example we run tests to make sure they still can be used with Airflow 2.1.0. However - what we DO NOT guarantee - that you can install the providers together with oher released providers on the older airflow version using single "pip install" operation. Due to complexity of dependencies and their live nature we cannot guarantee that all combinations of providers can be installed "together" in a "single" operation. We can guarantee however that you should be able to upgrade providers on your own (as long as this version is compatible with your Airlfow version) and we even give you the ways how to do it: see installation and upgrade scenarios. But - in this case - depending on the set and versions of providers you already have installed, you are responsible to resolve all the conflict dependencies if and when they occur. We cannot guarantee that all combinations and versions of all airflow versions and providers will work. That's probably millions of combinations and I am pretty sure some of them won't work. But when you reduce it to the set of providers that you have and you use PIP to resolve those installs (Without using constraints) you should be able to get the right combination in vast majority of cases. But - it's your work and responsibilty not ours. We explicitly and deliberately do not put that responsibility on our shoulders. It's yours to bear. With great powers (flexibility of installing newer version of providers without upgrading airflow) , comes great responsibilities, The easiest way to keep up-to-date is just to upgrade Airflow to latest version - as soon as it released or soon after. This is the option you always have (or the option or upgrade providers independently between the releases - but manually). |
Closes: #20032
When using the Snowflake provider connector for Airflow, if a region is specified in the connection URI, the connection succeeds. If a region is not specified, the Snowflake instance will take US-West-2 as the default instance, however, the URI is malformed and results in an erroneous connection. This fix forms the URI depending on whether the user specifies the region name or not.
For example, when a region is specified, the URI looks like:
If a region is not specified, the URI looks like:
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.