Skip to content

Commit 887cbf9

Browse files
authored
fixes airflow provider init sequence (dlt-hub#569)
1 parent abad005 commit 887cbf9

File tree

3 files changed

+19
-8
lines changed

3 files changed

+19
-8
lines changed

dlt/common/configuration/providers/airflow.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from airflow.models import Variable
2-
31
from .toml import VaultTomlProvider
42

53

@@ -13,6 +11,7 @@ def name(self) -> str:
1311

1412
def _look_vault(self, full_key: str, hint: type) -> str:
1513
"""Get Airflow Variable with given `full_key`, return None if not found"""
14+
from airflow.models import Variable
1615
return Variable.get(full_key, default_var=None) # type: ignore
1716

1817
@property

dlt/common/configuration/specs/config_providers_context.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,28 +105,36 @@ def _airflow_providers() -> List[ConfigProvider]:
105105
task context will not be available. Still we want the provider to function so
106106
we just test if Airflow can be imported.
107107
"""
108-
if not is_airflow_installed():
109-
return []
108+
109+
providers: List[ConfigProvider] = []
110110

111111
try:
112112
# hide stdio. airflow typically dumps tons of warnings and deprecations to stdout and stderr
113113
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
114-
from airflow.operators.python import get_current_context # noqa
115-
114+
# try to get dlt secrets variable. many broken Airflow installations break here. in that case do not create
116115
from airflow.models import Variable # noqa
117116
from dlt.common.configuration.providers.airflow import AirflowSecretsTomlProvider
118117
# probe if Airflow variable containing all secrets is present
119118
from dlt.common.configuration.providers.toml import SECRETS_TOML_KEY
120119
secrets_toml_var = Variable.get(SECRETS_TOML_KEY, default_var=None)
121120

121+
# providers can be returned - mind that AirflowSecretsTomlProvider() requests the variable above immediately
122+
providers = [AirflowSecretsTomlProvider()]
123+
124+
# check if we are in task context and provide more info
125+
from airflow.operators.python import get_current_context # noqa
126+
ti = get_current_context()["ti"]
127+
128+
# log outside of stderr/out redirect
122129
if secrets_toml_var is None:
123130
message = f"Airflow variable '{SECRETS_TOML_KEY}' was not found. " + \
124131
"This Airflow variable is a recommended place to hold the content of secrets.toml." + \
125132
"If you do not use Airflow variables to hold dlt configuration or use variables with other names you can ignore this warning."
126-
ti = get_current_context()["ti"]
127133
ti.log.warning(message)
134+
128135
except Exception:
129136
# do not probe variables when not in task context
130137
pass
131138

132-
return [AirflowSecretsTomlProvider()]
139+
# airflow not detected
140+
return providers

tests/helpers/airflow_tests/test_airflow_provider.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ def test_dag():
2727
from dlt.common.configuration.providers.airflow import AirflowSecretsTomlProvider
2828

2929
Variable.set(SECRETS_TOML_KEY, SECRETS_TOML_CONTENT)
30+
# make sure provider works while creating DAG
31+
provider = AirflowSecretsTomlProvider()
32+
assert provider.get_value("api_key", str, None, "sources")[0] == "test_value"
3033

3134
@task()
3235
def test_task():
@@ -78,6 +81,7 @@ def test_dag():
7881

7982
# this will initialize provider context
8083
api_key = secrets["sources.api_key"]
84+
assert api_key == "test_value"
8185

8286
@task()
8387
def test_task():

0 commit comments

Comments
 (0)