Skip to content

Commit 00a1cb4

Browse files
committed
Introduce BundleDagBag for ephemeral pre-loaded bundle path usage.
- usefor CLI and other one-off DagBag instances 'polluting' sys.path
1 parent 83c884e commit 00a1cb4

File tree

12 files changed

+86
-49
lines changed

12 files changed

+86
-49
lines changed

airflow-core/src/airflow/cli/commands/dag_command.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from airflow.cli.simple_table import AirflowConsole
3838
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
3939
from airflow.dag_processing.bundles.manager import DagBundlesManager
40-
from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
40+
from airflow.dag_processing.dagbag import BundleDagBag, DagBag, sync_bag_to_db
4141
from airflow.exceptions import AirflowConfigException, AirflowException
4242
from airflow.jobs.job import Job
4343
from airflow.models import DagModel, DagRun, TaskInstance
@@ -378,10 +378,12 @@ def dag_list_dags(args, session: Session = NEW_SESSION) -> None:
378378

379379
for bundle in all_bundles:
380380
if bundle.name in bundles_to_search:
381-
dagbag = DagBag(bundle.path, bundle_path=bundle.path, bundle_name=bundle.name)
382-
dagbag.collect_dags()
383-
dags_list.extend(list(dagbag.dags.values()))
384-
dagbag_import_errors += len(dagbag.import_errors)
381+
bundle_dagbag = BundleDagBag(
382+
bundle.path, bundle_path=bundle.path, bundle_name=bundle.name
383+
)
384+
bundle_dagbag.collect_dags()
385+
dags_list.extend(list(bundle_dagbag.dags.values()))
386+
dagbag_import_errors += len(bundle_dagbag.import_errors)
385387
else:
386388
dagbag = DagBag()
387389
dagbag.collect_dags()
@@ -474,8 +476,10 @@ def dag_list_import_errors(args, session: Session = NEW_SESSION) -> None:
474476

475477
for bundle in all_bundles:
476478
if bundle.name in bundles_to_search:
477-
dagbag = DagBag(bundle.path, bundle_path=bundle.path, bundle_name=bundle.name)
478-
for filename, errors in dagbag.import_errors.items():
479+
bundle_dagbag = BundleDagBag(
480+
bundle.path, bundle_path=bundle.path, bundle_name=bundle.name
481+
)
482+
for filename, errors in bundle_dagbag.import_errors.items():
479483
data.append({"bundle_name": bundle.name, "filepath": filename, "error": errors})
480484
else:
481485
dagbag = DagBag()
@@ -526,7 +530,9 @@ def dag_report(args) -> None:
526530
if bundle.name not in bundles_to_reserialize:
527531
continue
528532
bundle.initialize()
529-
dagbag = DagBag(bundle.path, bundle_path=bundle.path, bundle_name=bundle.name, include_examples=False)
533+
dagbag = BundleDagBag(
534+
bundle.path, bundle_path=bundle.path, bundle_name=bundle.name, include_examples=False
535+
)
530536
all_dagbag_stats.extend(dagbag.dagbag_stats)
531537

532538
AirflowConsole().print_as(
@@ -690,7 +696,7 @@ def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
690696
if bundle.name not in bundles_to_reserialize:
691697
continue
692698
bundle.initialize()
693-
dag_bag = DagBag(
699+
dag_bag = BundleDagBag(
694700
bundle.path, bundle_path=bundle.path, bundle_name=bundle.name, include_examples=False
695701
)
696702
sync_bag_to_db(dag_bag, bundle.name, bundle_version=bundle.get_current_version(), session=session)

airflow-core/src/airflow/dag_processing/dagbag.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -235,13 +235,6 @@ def __init__(
235235
self.bundle_path = bundle_path
236236
self.bundle_name = bundle_name
237237

238-
# Add bundle path to sys.path if provided.
239-
# This allows DAG files to import modules from their bundle directory.
240-
# No cleanup is performed - this is intentional for ephemeral processes
241-
# (dag processor, task runner, CLI) where the process exits after use.
242-
if bundle_path and str(bundle_path) not in sys.path:
243-
sys.path.append(str(bundle_path))
244-
245238
dag_folder = dag_folder or settings.DAGS_FOLDER
246239
self.dag_folder = dag_folder
247240
self.dags: dict[str, DAG] = {}
@@ -702,6 +695,30 @@ def dagbag_report(self):
702695
return report
703696

704697

698+
class BundleDagBag(DagBag):
699+
"""
700+
Bundle-aware DagBag that permanently modifies sys.path.
701+
702+
This class adds the bundle_path to sys.path permanently to allow DAG files
703+
to import modules from their bundle directory. No cleanup is performed.
704+
705+
WARNING: Only use for one-off usages like CLI commands. Using this in long-running
706+
processes will cause sys.path to accumulate entries.
707+
708+
Same parameters as DagBag, but bundle_path is required.
709+
"""
710+
711+
def __init__(self, *args, bundle_path: Path | None = None, **kwargs):
712+
if not bundle_path:
713+
raise ValueError("bundle_path is required for BundleDagBag")
714+
715+
if str(bundle_path) not in sys.path:
716+
sys.path.append(str(bundle_path))
717+
718+
kwargs["bundle_path"] = bundle_path
719+
super().__init__(*args, **kwargs)
720+
721+
705722
@provide_session
706723
def sync_bag_to_db(
707724
dagbag: DagBag,

airflow-core/src/airflow/dag_processing/processor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
TaskCallbackRequest,
3535
)
3636
from airflow.configuration import conf
37-
from airflow.dag_processing.dagbag import DagBag
37+
from airflow.dag_processing.dagbag import BundleDagBag, DagBag
3838
from airflow.observability.stats import Stats
3939
from airflow.sdk.exceptions import TaskNotFound
4040
from airflow.sdk.execution_time.comms import (
@@ -205,7 +205,7 @@ def _parse_file_entrypoint():
205205
def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None:
206206
# TODO: Set known_pool names on DagBag!
207207

208-
bag = DagBag(
208+
bag = BundleDagBag(
209209
dag_folder=msg.file,
210210
bundle_path=msg.bundle_path,
211211
bundle_name=msg.bundle_name,

airflow-core/src/airflow/models/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
"BaseOperator",
3030
"BaseOperatorLink",
3131
"BaseXCom",
32+
"BundleDagBag",
3233
"Connection",
3334
"DagBag",
3435
"DagWarning",
@@ -98,6 +99,7 @@ def __getattr__(name):
9899
"BaseOperator": "airflow.sdk",
99100
"BaseOperatorLink": "airflow.sdk",
100101
"BaseXCom": "airflow.sdk.bases.xcom",
102+
"BundleDagBag": "airflow.dag_processing.dagbag",
101103
"Callback": "airflow.models.callback",
102104
"Connection": "airflow.models.connection",
103105
"DagBag": "airflow.dag_processing.dagbag",
@@ -126,7 +128,7 @@ def __getattr__(name):
126128
if TYPE_CHECKING:
127129
# I was unable to get mypy to respect a airflow/models/__init__.pyi, so
128130
# having to resort back to this hacky method
129-
from airflow.dag_processing.dagbag import DagBag
131+
from airflow.dag_processing.dagbag import BundleDagBag, DagBag
130132
from airflow.models.base import ID_LEN, Base
131133
from airflow.models.callback import Callback
132134
from airflow.models.connection import Connection

airflow-core/src/airflow/utils/cli.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -272,14 +272,14 @@ def get_bagged_dag(bundle_names: list | None, dag_id: str, dagfile_path: str | N
272272
find the correct path (assuming it's a file) and failing that, use the configured
273273
dags folder.
274274
"""
275-
from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
275+
from airflow.dag_processing.dagbag import BundleDagBag, sync_bag_to_db
276276
from airflow.sdk.definitions._internal.dag_parsing_context import _airflow_parsing_context_manager
277277

278278
manager = DagBundlesManager()
279279
for bundle_name in bundle_names or ():
280280
bundle = manager.get_bundle(bundle_name)
281281
with _airflow_parsing_context_manager(dag_id=dag_id):
282-
dagbag = DagBag(
282+
dagbag = BundleDagBag(
283283
dag_folder=dagfile_path or bundle.path,
284284
bundle_path=bundle.path,
285285
bundle_name=bundle.name,
@@ -292,7 +292,7 @@ def get_bagged_dag(bundle_names: list | None, dag_id: str, dagfile_path: str | N
292292
for bundle in manager.get_all_dag_bundles():
293293
bundle.initialize()
294294
with _airflow_parsing_context_manager(dag_id=dag_id):
295-
dagbag = DagBag(
295+
dagbag = BundleDagBag(
296296
dag_folder=dagfile_path or bundle.path,
297297
bundle_path=bundle.path,
298298
bundle_name=bundle.name,
@@ -323,7 +323,7 @@ def get_db_dag(bundle_names: list | None, dag_id: str, dagfile_path: str | None
323323

324324
def get_dags(bundle_names: list | None, dag_id: str, use_regex: bool = False, from_db: bool = False):
325325
"""Return DAG(s) matching a given regex or dag_id."""
326-
from airflow.dag_processing.dagbag import DagBag
326+
from airflow.dag_processing.dagbag import BundleDagBag
327327

328328
bundle_names = bundle_names or []
329329

@@ -333,7 +333,7 @@ def get_dags(bundle_names: list | None, dag_id: str, use_regex: bool = False, fr
333333
return [get_bagged_dag(bundle_names=bundle_names, dag_id=dag_id)]
334334

335335
def _find_dag(bundle):
336-
dagbag = DagBag(dag_folder=bundle.path, bundle_path=bundle.path, bundle_name=bundle.name)
336+
dagbag = BundleDagBag(dag_folder=bundle.path, bundle_path=bundle.path, bundle_name=bundle.name)
337337
matched_dags = [dag for dag in dagbag.dags.values() if re.search(dag_id, dag.dag_id)]
338338
return matched_dags
339339

airflow-core/tests/unit/cli/commands/test_dag_command.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,7 @@ def test_dag_test_show_dag(self, mock_get_dag, mock_render_dag, stdout_capture):
760760
mock_render_dag.assert_has_calls([mock.call(mock_get_dag.return_value, tis=[])])
761761
assert "SOURCE" in output
762762

763-
@mock.patch("airflow.dag_processing.dagbag.DagBag")
763+
@mock.patch("airflow.dag_processing.dagbag.BundleDagBag")
764764
def test_dag_test_with_bundle_name(self, mock_dagbag, configure_dag_bundles):
765765
"""Test that DAG can be tested using bundle name."""
766766
mock_dagbag.return_value.get_dag.return_value.test.return_value = DagRun(
@@ -788,7 +788,7 @@ def test_dag_test_with_bundle_name(self, mock_dagbag, configure_dag_bundles):
788788
include_examples=False,
789789
)
790790

791-
@mock.patch("airflow.dag_processing.dagbag.DagBag")
791+
@mock.patch("airflow.dag_processing.dagbag.BundleDagBag")
792792
def test_dag_test_with_dagfile_path(self, mock_dagbag, configure_dag_bundles):
793793
"""Test that DAG can be tested using dagfile path."""
794794
mock_dagbag.return_value.get_dag.return_value.test.return_value = DagRun(
@@ -810,7 +810,7 @@ def test_dag_test_with_dagfile_path(self, mock_dagbag, configure_dag_bundles):
810810
include_examples=False,
811811
)
812812

813-
@mock.patch("airflow.dag_processing.dagbag.DagBag")
813+
@mock.patch("airflow.dag_processing.dagbag.BundleDagBag")
814814
def test_dag_test_with_both_bundle_and_dagfile_path(self, mock_dagbag, configure_dag_bundles):
815815
"""Test that DAG can be tested using both bundle name and dagfile path."""
816816
mock_dagbag.return_value.get_dag.return_value.test.return_value = DagRun(

airflow-core/tests/unit/dag_processing/test_dagbag.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@
3535
from sqlalchemy import select
3636

3737
from airflow import settings
38-
from airflow.dag_processing.dagbag import DagBag, _capture_with_reraise, _validate_executor_fields
38+
from airflow.dag_processing.dagbag import (
39+
BundleDagBag,
40+
DagBag,
41+
_capture_with_reraise,
42+
_validate_executor_fields,
43+
)
3944
from airflow.exceptions import UnknownExecutorException
4045
from airflow.executors.executor_loader import ExecutorLoader
4146
from airflow.models.dag import DagModel
@@ -1195,10 +1200,10 @@ def test_capture_warnings_with_error_filters(self):
11951200

11961201

11971202
class TestBundlePathSysPath:
1198-
"""Tests for bundle_path sys.path handling in DagBag."""
1203+
"""Tests for bundle_path sys.path handling in BundleDagBag."""
11991204

12001205
def test_bundle_path_added_to_syspath(self, tmp_path):
1201-
"""Test that DagBag adds bundle_path to sys.path when provided."""
1206+
"""Test that BundleDagBag adds bundle_path to sys.path when provided."""
12021207
util_file = tmp_path / "bundle_util.py"
12031208
util_file.write_text('def get_message(): return "Hello from bundle!"')
12041209

@@ -1220,7 +1225,9 @@ def test_bundle_path_added_to_syspath(self, tmp_path):
12201225

12211226
assert str(tmp_path) not in sys.path
12221227

1223-
dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path, include_examples=False)
1228+
dagbag = BundleDagBag(
1229+
dag_folder=str(dag_file), bundle_path=tmp_path, bundle_name="test-bundle", include_examples=False
1230+
)
12241231

12251232
# Check import was successful
12261233
assert len(dagbag.dags) == 1
@@ -1255,16 +1262,18 @@ def test_bundle_path_not_duplicated(self, tmp_path):
12551262
sys.path.append(str(tmp_path))
12561263
count_before = sys.path.count(str(tmp_path))
12571264

1258-
DagBag(dag_folder=str(dag_file), bundle_path=tmp_path, include_examples=False)
1265+
BundleDagBag(
1266+
dag_folder=str(dag_file), bundle_path=tmp_path, bundle_name="test-bundle", include_examples=False
1267+
)
12591268

12601269
# Should not add duplicate
12611270
assert sys.path.count(str(tmp_path)) == count_before
12621271

12631272
# Cleanup for other tests
12641273
sys.path.remove(str(tmp_path))
12651274

1266-
def test_bundle_path_none_no_syspath_modification(self, tmp_path):
1267-
"""Test that no sys.path modification occurs when bundle_path is None."""
1275+
def test_dagbag_no_bundle_path_no_syspath_modification(self, tmp_path):
1276+
"""Test that no sys.path modification occurs when DagBag is used without bundle_path."""
12681277
dag_file = tmp_path / "simple_dag.py"
12691278
dag_file.write_text(
12701279
textwrap.dedent(
@@ -1280,7 +1289,7 @@ def test_bundle_path_none_no_syspath_modification(self, tmp_path):
12801289
)
12811290
)
12821291
syspath_before = deepcopy(sys.path)
1283-
dagbag = DagBag(dag_folder=str(dag_file), bundle_path=None, include_examples=False)
1292+
dagbag = DagBag(dag_folder=str(dag_file), include_examples=False)
12841293
dag = dagbag.get_dag("simple_dag")
12851294

12861295
assert str(tmp_path) not in dag.description

airflow-core/tests/unit/dag_processing/test_processor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1795,9 +1795,9 @@ def fake_collect_dags(self, *args, **kwargs):
17951795
_execute_email_callbacks(dagbag, request, log)
17961796

17971797
def test_parse_file_passes_bundle_name_to_dagbag(self):
1798-
"""Test that _parse_file() creates DagBag with correct bundle_name parameter"""
1799-
# Mock the DagBag constructor to capture its arguments
1800-
with patch("airflow.dag_processing.processor.DagBag") as mock_dagbag_class:
1798+
"""Test that _parse_file() creates BundleDagBag with correct bundle_name parameter"""
1799+
# Mock the BundleDagBag constructor to capture its arguments
1800+
with patch("airflow.dag_processing.processor.BundleDagBag") as mock_dagbag_class:
18011801
# Create a mock instance with proper attributes for Pydantic validation
18021802
mock_dagbag_instance = MagicMock()
18031803
mock_dagbag_instance.dags = {}
@@ -1813,7 +1813,7 @@ def test_parse_file_passes_bundle_name_to_dagbag(self):
18131813

18141814
_parse_file(request, log=structlog.get_logger())
18151815

1816-
# Verify DagBag was called with correct bundle_name
1816+
# Verify BundleDagBag was called with correct bundle_name
18171817
mock_dagbag_class.assert_called_once()
18181818
call_kwargs = mock_dagbag_class.call_args.kwargs
18191819
assert call_kwargs["bundle_name"] == "test_bundle"

airflow-core/tests/unit/models/test_dag.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
from airflow._shared.timezones import timezone
4141
from airflow._shared.timezones.timezone import datetime as datetime_tz
4242
from airflow.configuration import conf
43-
from airflow.dag_processing.dagbag import DagBag
43+
from airflow.dag_processing.dagbag import BundleDagBag, DagBag
4444
from airflow.exceptions import AirflowException
4545
from airflow.models.asset import (
4646
AssetAliasModel,
@@ -2194,7 +2194,7 @@ def test_relative_fileloc(self, session, testing_dag_bundle):
21942194
rel_path = "test_assets.py"
21952195
bundle_path = TEST_DAGS_FOLDER
21962196
file_path = bundle_path / rel_path
2197-
bag = DagBag(dag_folder=file_path, bundle_path=bundle_path)
2197+
bag = BundleDagBag(dag_folder=file_path, bundle_path=bundle_path, bundle_name="testing")
21982198

21992199
dag = bag.get_dag("dag_with_skip_task")
22002200

task-sdk/src/airflow/sdk/definitions/dag.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,7 +1241,7 @@ def test(
12411241
version = DagVersion.get_version(self.dag_id)
12421242
if not version:
12431243
from airflow.dag_processing.bundles.manager import DagBundlesManager
1244-
from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
1244+
from airflow.dag_processing.dagbag import BundleDagBag, sync_bag_to_db
12451245
from airflow.sdk.definitions._internal.dag_parsing_context import (
12461246
_airflow_parsing_context_manager,
12471247
)
@@ -1255,8 +1255,11 @@ def test(
12551255
if not bundle.is_initialized:
12561256
bundle.initialize()
12571257
with _airflow_parsing_context_manager(dag_id=self.dag_id):
1258-
dagbag = DagBag(
1259-
dag_folder=bundle.path, bundle_path=bundle.path, include_examples=False
1258+
dagbag = BundleDagBag(
1259+
dag_folder=bundle.path,
1260+
bundle_path=bundle.path,
1261+
bundle_name=bundle.name,
1262+
include_examples=False,
12601263
)
12611264
sync_bag_to_db(dagbag, bundle.name, bundle.version)
12621265
version = DagVersion.get_version(self.dag_id)

0 commit comments

Comments
 (0)