Skip to content

Commit 75f8aa9

Browse files
authored
Fix Jinja2 Template deep copy error with dag.test (#51670)
The `dag.test` method currently (in 3.0.2) clears dag runs before running ( [code](https://github.com/apache/airflow/blob/3.0.2/task-sdk/src/airflow/sdk/definitions/dag.py#L1092-L1097) ) the dag. For that it uses `SchedulerDAG.clear_dags`, which create `SchedulerDAG` from Task SDK DAG -- to do that it uses [`deepcopy.copy`](https://github.com/apache/airflow/blob/3.0.2/airflow-core/src/airflow/models/dag.py#L1797). Now that works for most cases, but not where `jinja2.Template` class is passed as a Task/Operator argument because of [this issue](pallets/jinja#758). ```python In [1]: from copy import deepcopy ...: ...: from jinja2 import Template ...: ...: deepcopy(Template('')) --------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In[1], line 5 1 from copy import deepcopy 3 from jinja2 import Template ----> 5 deepcopy(Template('')) File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copy.py:162, in deepcopy(x, memo, _nil) 160 y = x 161 else: --> 162 y = _reconstruct(x, memo, *rv) 164 # If is its own copy, don't memoize. 165 if y is not x: File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copy.py:253, in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy) 251 if deep and args: 252 args = (deepcopy(arg, memo) for arg in args) --> 253 y = func(*args) 254 if deep: 255 memo[id(x)] = y File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copyreg.py:99, in __newobj__(cls, *args) 98 def __newobj__(cls, *args): ---> 99 return cls.__new__(cls, *args) TypeError: Template.__new__() missing 1 required positional argument: 'source' ``` This is a general issue that can affect any DAG using custom arguments that store or cache Jinja2 Template objects or similar. Clearly, the longer term solution for dag.test is to not use SchedulerDAG like that and instead call some task Execution endpoint to clear dagruns -- but this works for 3.0.x.
1 parent de9470e commit 75f8aa9

File tree

1 file changed

+1
-1
lines changed
  • airflow-core/src/airflow/models

1 file changed

+1
-1
lines changed

airflow-core/src/airflow/models/dag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1808,7 +1808,7 @@ def create_tasks(task):
18081808
if isinstance(task, TaskGroup):
18091809
return task_group_map[task.group_id]
18101810

1811-
new_task = copy.deepcopy(task)
1811+
new_task = copy.copy(task)
18121812

18131813
# Only overwrite the specific attributes we want to change
18141814
new_task.task_id = task.task_id

0 commit comments

Comments
 (0)