Skip to content

Commit 59e1750

Browse files
committed
do not probe Airflow variables on start
1 parent 67fc5b1 commit 59e1750

File tree

11 files changed

+115
-52
lines changed

11 files changed

+115
-52
lines changed

.github/workflows/test_airflow.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ jobs:
4141
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-airflow-runner
4242

4343
- name: Install dependencies
44-
run: poetry install --no-interaction --with airflow -E duckdb -E pyarrow
44+
run: poetry install --no-interaction --with airflow -E duckdb -E pyarrow -E cli
4545

4646
- run: |
4747
poetry run pytest tests/helpers/airflow_tests

.github/workflows/test_common.yml

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,6 @@ jobs:
5555
virtualenvs-in-project: true
5656
installer-parallel: true
5757

58-
# - name: Get pip cache dir
59-
# id: pip-cache
60-
# run: |
61-
# echo "::set-output name=dir::$(poetry env info -p)"
62-
# echo "$(poetry env info -p)"
63-
6458
- name: Load cached venv
6559
id: cached-poetry-dependencies
6660
uses: actions/cache@v3
@@ -69,28 +63,28 @@ jobs:
6963
path: .venv
7064
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}
7165

72-
- name: Install dependencies
73-
run: poetry install --no-interaction -E pyarrow
66+
- name: Install dependencies + sentry
67+
run: poetry install --no-interaction -E pyarrow && pip install sentry-sdk
7468

7569
- run: |
76-
poetry run pytest tests/common tests/normalize tests/reflection tests/sources tests/cli/common
70+
poetry run pytest tests/common tests/normalize tests/reflection tests/sources
7771
if: runner.os != 'Windows'
7872
name: Run tests Linux/MAC
7973
- run: |
80-
poetry run pytest tests/common tests/normalize tests/reflection tests/sources tests/cli/common -m "not forked"
74+
poetry run pytest tests/common tests/normalize tests/reflection tests/sources -m "not forked"
8175
if: runner.os == 'Windows'
8276
name: Run tests Windows
8377
shell: cmd
8478
8579
- name: Install extra dependencies
86-
run: poetry install --no-interaction -E duckdb -E pyarrow
80+
run: poetry install --no-interaction -E duckdb -E cli
8781

8882
- run: |
89-
poetry run pytest tests/extract tests/pipeline
83+
poetry run pytest tests/extract tests/pipeline tests/cli/common
9084
if: runner.os != 'Windows'
9185
name: Run extra tests Linux/MAC
9286
- run: |
93-
poetry run pytest tests/extract tests/pipeline -m "not forked"
87+
poetry run pytest tests/extract tests/pipeline tests/cli/common -m "not forked"
9488
if: runner.os == 'Windows'
9589
name: Run extra tests Windows
9690
shell: cmd

.github/workflows/test_destinations.yml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,6 @@ jobs:
6363
virtualenvs-in-project: true
6464
installer-parallel: true
6565

66-
# - name: Get pip cache dir
67-
# id: pip-cache
68-
# run: |
69-
# echo "::set-output name=dir::$(poetry env info -p)"
70-
# echo "$(poetry env info -p)"
71-
7266
- name: Load cached venv
7367
id: cached-poetry-dependencies
7468
uses: actions/cache@v3
@@ -79,7 +73,7 @@ jobs:
7973

8074
- name: Install dependencies
8175
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
82-
run: poetry install --no-interaction -E redshift -E gs -E s3 -E pyarrow -E duckdb
76+
run: poetry install --no-interaction -E redshift -E gs -E s3 -E pyarrow -E duckdb -E cli
8377

8478
# - name: Install self
8579
# run: poetry install --no-interaction

.github/workflows/test_local_destinations.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ jobs:
7676
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-redshift
7777

7878
- name: Install dependencies
79-
run: poetry install --no-interaction -E postgres -E duckdb -E pyarrow -E filesystem
79+
run: poetry install --no-interaction -E postgres -E duckdb -E pyarrow -E filesystem -E cli
8080

8181
- run: poetry run pytest tests/load tests/cli
8282
name: Run tests Linux

dlt/cli/_dlt.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,6 @@ def main() -> int:
200200
# make sure the name is defined
201201
_ = deploy_command
202202
deploy_comm = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False)
203-
deploy_comm.add_argument("--schedule", required=False, help="A schedule with which to run the pipeline, in cron format. Example: '*/30 * * * *' will run the pipeline every 30 minutes.")
204203
deploy_comm.add_argument("--location", help="Advanced. Uses a specific url or local path to pipelines repository.")
205204
deploy_comm.add_argument("--branch", help="Advanced. Uses specific branch of the deploy repository to fetch the template.")
206205

@@ -210,6 +209,7 @@ def main() -> int:
210209

211210
# deploy github actions
212211
deploy_github_cmd = deploy_sub_parsers.add_parser(DeploymentMethods.github_actions.value, help="Deploys the pipeline to Github Actions", parents=[deploy_comm])
212+
deploy_github_cmd.add_argument("--schedule", required=False, help="A schedule with which to run the pipeline, in cron format. Example: '*/30 * * * *' will run the pipeline every 30 minutes.")
213213
deploy_github_cmd.add_argument("--run-manually", default=True, action="store_true", help="Allows the pipeline to be run manually form Github Actions UI.")
214214
deploy_github_cmd.add_argument("--run-on-push", default=False, action="store_true", help="Runs the pipeline with every push to the repository.")
215215

dlt/common/configuration/specs/config_providers_context.py

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -102,34 +102,31 @@ def _airflow_providers() -> List[ConfigProvider]:
102102
returns a list containing the Airflow providers.
103103
104104
Depending on how DAG is defined this function may be called outside of task and
105-
task context will be not available. Still we want the provider to function so
105+
task context will not be available. Still we want the provider to function so
106106
we just test if Airflow can be imported.
107107
"""
108108
if not is_airflow_installed():
109109
return []
110-
from dlt.common.configuration.providers.toml import SECRETS_TOML_KEY
111110

112-
# hide stdio. airflow typically dumps tons of warnings and deprecations to stdout and stderr
113-
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
114-
from airflow.models import Variable # noqa
115-
from dlt.common.configuration.providers.airflow import AirflowSecretsTomlProvider
116-
117-
secrets_toml_var = Variable.get(SECRETS_TOML_KEY, default_var=None)
118-
119-
if secrets_toml_var is None:
120-
message = f"Airflow variable '{SECRETS_TOML_KEY}' not found. AirflowSecretsTomlProvider will not be used."
121-
try:
122-
# prefer logging to task logger
111+
try:
112+
# hide stdio. airflow typically dumps tons of warnings and deprecations to stdout and stderr
113+
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
123114
from airflow.operators.python import get_current_context # noqa
124115

116+
from airflow.models import Variable # noqa
117+
from dlt.common.configuration.providers.airflow import AirflowSecretsTomlProvider
118+
# probe if Airflow variable containing all secrets is present
119+
from dlt.common.configuration.providers.toml import SECRETS_TOML_KEY
120+
secrets_toml_var = Variable.get(SECRETS_TOML_KEY, default_var=None)
121+
122+
if secrets_toml_var is None:
123+
message = f"Airflow variable '{SECRETS_TOML_KEY}' was not found. " + \
124+
"This Airflow variable is a recommended place to hold the content of secrets.toml." + \
125+
"If you do not use Airflow variables to hold dlt configuration or use variables with other names you can ignore this warning."
125126
ti = get_current_context()["ti"]
126127
ti.log.warning(message)
127-
except Exception:
128-
# otherwise log to dlt logger
129-
from dlt.common import logger
130-
if logger.is_logging():
131-
logger.warning(message)
132-
else:
133-
print(message)
128+
except Exception:
129+
# do not probe variables when not in task context
130+
pass
134131

135132
return [AirflowSecretsTomlProvider()]

dlt/common/runtime/exec_info.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def is_airflow_installed() -> bool:
9090
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
9191
import airflow
9292
return True
93-
except ImportError:
93+
except Exception:
9494
return False
9595

9696

dlt/common/typing.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@
99

1010
if TYPE_CHECKING:
1111
from _typeshed import StrOrBytesPath
12-
# from typing_extensions import ParamSpec
1312
from typing import _TypedDict
1413
REPattern = _REPattern[str]
1514
else:
1615
StrOrBytesPath = Any
1716
from typing import _TypedDictMeta as _TypedDict
1817
REPattern = _REPattern
19-
# ParamSpec = lambda x: [x]
2018

2119
AnyType: TypeAlias = Any
2220
NoneType = type(None)

dlt/extract/decorators.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@ def __init__(self, schema: Schema = None) -> None:
3939
...
4040

4141
TSourceFunParams = ParamSpec("TSourceFunParams")
42+
"""Params of @source decorated function"""
4243
TResourceFunParams = ParamSpec("TResourceFunParams")
44+
"""Params of @resource decorated function"""
45+
TDeferredFunParams = ParamSpec("TDeferredFunParams")
46+
"""Params of @deferred function"""
47+
TTransformerFun = Callable[Concatenate[TDataItem, TResourceFunParams], Any]
48+
"""Params of @transformer decorated function where first argument is always data item"""
4349

4450

4551
@overload
@@ -383,12 +389,12 @@ def transformer(
383389
merge_key: TTableHintTemplate[TColumnKey] = None,
384390
selected: bool = True,
385391
spec: Type[BaseConfiguration] = None
386-
) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], Callable[TResourceFunParams, DltResource]]:
392+
) -> Callable[[TTransformerFun[TResourceFunParams]], Callable[TResourceFunParams, DltResource]]:
387393
...
388394

389395
@overload
390396
def transformer(
391-
f: Callable[Concatenate[TDataItem, TResourceFunParams], Any],
397+
f: TTransformerFun[TResourceFunParams],
392398
/,
393399
data_from: TUnboundDltResource = DltResource.Empty,
394400
name: str = None,
@@ -403,7 +409,7 @@ def transformer(
403409
...
404410

405411
def transformer( # type: ignore
406-
f: Optional[Callable[Concatenate[TDataItem, TResourceFunParams], Any]] = None,
412+
f: Optional[TTransformerFun[TResourceFunParams]] = None,
407413
/,
408414
data_from: TUnboundDltResource = DltResource.Empty,
409415
name: str = None,
@@ -414,7 +420,7 @@ def transformer( # type: ignore
414420
merge_key: TTableHintTemplate[TColumnKey] = None,
415421
selected: bool = True,
416422
spec: Type[BaseConfiguration] = None
417-
) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], Callable[TResourceFunParams, DltResource]]:
423+
) -> Callable[[TTransformerFun[TResourceFunParams]], Callable[TResourceFunParams, DltResource]]:
418424
"""A form of `dlt resource` that takes input from other resources via `data_from` argument in order to enrich or transform the data.
419425
420426
The decorated function `f` must take at least one argument of type TDataItems (a single item or list of items depending on the resource `data_from`). `dlt` will pass
@@ -515,7 +521,7 @@ def get_source_schema() -> Schema:
515521

516522
TBoundItems = TypeVar("TBoundItems", bound=TDataItems)
517523
TDeferred = Callable[[], TBoundItems]
518-
TDeferredFunParams = ParamSpec("TDeferredFunParams")
524+
519525

520526

521527
def defer(f: Callable[TDeferredFunParams, TBoundItems]) -> Callable[TDeferredFunParams, TDeferred[TBoundItems]]:

tests/common/runtime/test_telemetry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
@configspec
2020
class SentryLoggerConfiguration(RunConfiguration):
2121
pipeline_name: str = "logger"
22-
# sentry_dsn: str = "https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752"
22+
sentry_dsn: str = "https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752"
2323
dlthub_telemetry_segment_write_key: str = "TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB"
2424

2525

0 commit comments

Comments
 (0)