Skip to content

Commit b9937fb

Browse files
authored
Revert "Support templated extra in outlets assets (#54885)" (#55199)
This reverts commit 438b76c.
1 parent 1d73c01 commit b9937fb

File tree

5 files changed

+6
-136
lines changed

5 files changed

+6
-136
lines changed

airflow-core/docs/authoring-and-scheduling/assets.rst

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -104,27 +104,6 @@ If needed, you can include an extra dictionary in an asset:
104104
105105
This can be used to supply custom description to the asset, such as who has ownership to the target file, or what the file is for. The extra information does not affect an asset's identity.
106106

107-
You can also use Jinja templating in the extra dictionary to enrich the asset with runtime information, such as the execution date of the task that emits events of the asset:
108-
109-
.. code-block::
110-
111-
BashOperator(
112-
task_id="write_example_asset",
113-
bash_command="echo 'writing...'",
114-
outlets=Asset(
115-
"asset_example",
116-
extra={
117-
"static_extra": "value",
118-
"dag_id": "{{ dag.dag_id }}",
119-
"nested_extra": {
120-
"run_id": "{{ run_id }}",
121-
"logical_date": "{{ ds }}",
122-
}
123-
}
124-
),
125-
)
126-
127-
128107
.. note:: **Security Note:** Asset URI and extra fields are not encrypted, they are stored in cleartext in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in either asset URIs or extra key values!
129108

130109
Creating a task to emit asset events

task-sdk/src/airflow/sdk/definitions/_internal/templater.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def prepare_template(self) -> None:
8989

9090
def resolve_template_files(self) -> None:
9191
"""Get the content of files for template_field / template_ext."""
92-
if getattr(self, "template_ext", None):
92+
if self.template_ext:
9393
for field in self.template_fields:
9494
content = getattr(self, field, None)
9595
if isinstance(content, str) and content.endswith(tuple(self.template_ext)):
@@ -170,7 +170,7 @@ def render_template(
170170
jinja_env = self.get_template_env()
171171

172172
if isinstance(value, str):
173-
if hasattr(self, "template_ext") and value.endswith(tuple(self.template_ext)): # A filepath.
173+
if value.endswith(tuple(self.template_ext)): # A filepath.
174174
template = jinja_env.get_template(value)
175175
else:
176176
template = jinja_env.from_string(value)

task-sdk/src/airflow/sdk/definitions/asset/__init__.py

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,13 @@
2929
import attrs
3030

3131
from airflow.sdk.api.datamodels._generated import AssetProfile
32-
from airflow.sdk.definitions._internal.templater import Templater
3332
from airflow.serialization.dag_dependency import DagDependency
3433

3534
if TYPE_CHECKING:
3635
from collections.abc import Iterable, Iterator
3736
from urllib.parse import SplitResult
3837

39-
import jinja2
40-
4138
from airflow.models.asset import AssetModel
42-
from airflow.sdk import Context
4339
from airflow.sdk.io.path import ObjectStoragePath
4440
from airflow.serialization.serialized_objects import SerializedAssetWatcher
4541
from airflow.triggers.base import BaseEventTrigger
@@ -309,7 +305,7 @@ def __init__(
309305

310306

311307
@attrs.define(init=False, unsafe_hash=False)
312-
class Asset(os.PathLike, BaseAsset, Templater):
308+
class Asset(os.PathLike, BaseAsset):
313309
"""A representation of data asset dependencies between workflows."""
314310

315311
name: str = attrs.field(
@@ -493,22 +489,6 @@ def asprofile(self) -> AssetProfile:
493489
"""
494490
return AssetProfile(name=self.name or None, uri=self.uri or None, type=Asset.__name__)
495491

496-
def render_extra_field(
497-
self,
498-
context: Context,
499-
jinja_env: jinja2.Environment | None = None,
500-
) -> None:
501-
"""
502-
Template extra attribute.
503-
504-
:param context: Context dict with values to apply on content.
505-
:param jinja_env: Jinja environment to use for rendering.
506-
"""
507-
dag = context["dag"]
508-
if not jinja_env:
509-
jinja_env = self.get_template_env(dag=dag)
510-
self._do_render_template_fields(self, ("extra",), context, jinja_env, set())
511-
512492

513493
class AssetRef(BaseAsset, AttrsInstance):
514494
"""

task-sdk/src/airflow/sdk/execution_time/task_runner.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1287,11 +1287,6 @@ def _execute_task(context: Context, ti: RuntimeTaskInstance, log: Logger):
12871287

12881288
outlet_events = context_get_outlet_events(context)
12891289

1290-
for outlet in task.outlets or ():
1291-
if isinstance(outlet, Asset):
1292-
outlet.render_extra_field(context, jinja_env=task.dag.get_template_env())
1293-
outlet_events[outlet].extra.update(outlet.extra)
1294-
12951290
if (pre_execute_hook := task._pre_execute_hook) is not None:
12961291
create_executable_runner(pre_execute_hook, outlet_events, logger=log).run(context)
12971292
if getattr(pre_execute_hook := task.pre_execute, "__func__", None) is not BaseOperator.pre_execute:

task-sdk/tests/task_sdk/execution_time/test_task_runner.py

Lines changed: 3 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -942,38 +942,10 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch
942942
task_outlets=[
943943
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset")
944944
],
945-
outlet_events=[
946-
{
947-
"dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
948-
"extra": {},
949-
}
950-
],
945+
outlet_events=[],
951946
),
952947
id="asset",
953948
),
954-
pytest.param(
955-
[
956-
Asset(
957-
name="s3://bucket/my-task",
958-
uri="s3://bucket/my-task",
959-
extra={"task_id": "{{ task.task_id }}"},
960-
)
961-
],
962-
SucceedTask(
963-
state="success",
964-
end_date=timezone.datetime(2024, 12, 3, 10, 0),
965-
task_outlets=[
966-
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset")
967-
],
968-
outlet_events=[
969-
{
970-
"dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
971-
"extra": {"task_id": "asset-outlet-task"},
972-
}
973-
],
974-
),
975-
id="asset_with_template_extra",
976-
),
977949
pytest.param(
978950
[Dataset(name="s3://bucket/my-task", uri="s3://bucket/my-task")],
979951
SucceedTask(
@@ -982,38 +954,10 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch
982954
task_outlets=[
983955
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset")
984956
],
985-
outlet_events=[
986-
{
987-
"dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
988-
"extra": {},
989-
}
990-
],
957+
outlet_events=[],
991958
),
992959
id="dataset",
993960
),
994-
pytest.param(
995-
[
996-
Dataset(
997-
name="s3://bucket/my-task",
998-
uri="s3://bucket/my-task",
999-
extra={"task_id": "{{ task.task_id }}"},
1000-
)
1001-
],
1002-
SucceedTask(
1003-
state="success",
1004-
end_date=timezone.datetime(2024, 12, 3, 10, 0),
1005-
task_outlets=[
1006-
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset")
1007-
],
1008-
outlet_events=[
1009-
{
1010-
"dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
1011-
"extra": {"task_id": "asset-outlet-task"},
1012-
}
1013-
],
1014-
),
1015-
id="dataset_with_template_extra",
1016-
),
1017961
pytest.param(
1018962
[Model(name="s3://bucket/my-task", uri="s3://bucket/my-task")],
1019963
SucceedTask(
@@ -1022,38 +966,10 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch
1022966
task_outlets=[
1023967
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset")
1024968
],
1025-
outlet_events=[
1026-
{
1027-
"dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
1028-
"extra": {},
1029-
}
1030-
],
969+
outlet_events=[],
1031970
),
1032971
id="model",
1033972
),
1034-
pytest.param(
1035-
[
1036-
Model(
1037-
name="s3://bucket/my-task",
1038-
uri="s3://bucket/my-task",
1039-
extra={"task_id": "{{ task.task_id }}"},
1040-
)
1041-
],
1042-
SucceedTask(
1043-
state="success",
1044-
end_date=timezone.datetime(2024, 12, 3, 10, 0),
1045-
task_outlets=[
1046-
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", type="Asset")
1047-
],
1048-
outlet_events=[
1049-
{
1050-
"dest_asset_key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
1051-
"extra": {"task_id": "asset-outlet-task"},
1052-
}
1053-
],
1054-
),
1055-
id="model_with_template_extra",
1056-
),
1057973
pytest.param(
1058974
[Asset.ref(name="s3://bucket/my-task")],
1059975
SucceedTask(

0 commit comments

Comments
 (0)