Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions airflow/models/baseoperatorlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.sdk.definitions.baseoperator import BaseOperator


@attrs.define()
Expand Down Expand Up @@ -99,9 +99,6 @@ def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
"""
Link to external system.

Note: The old signature of this function was ``(self, operator, dttm: datetime)``. That is still
supported at runtime but is deprecated.

:param operator: The Airflow operator object this link is associated to.
:param ti_key: TaskInstance ID to return link for.
:return: link to external system
Expand Down
8 changes: 3 additions & 5 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ def global_operator_extra_link_dict(self) -> dict[str, Any]:
def extra_links(self) -> list[str]:
return sorted(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))

def get_extra_links(self, ti: TaskInstance, link_name: str) -> str | None:
def get_extra_links(self, ti: TaskInstance, name: str) -> str | None:
"""
For an operator, gets the URLs that the ``extra_links`` entry points to.

Expand All @@ -1212,11 +1212,9 @@ def get_extra_links(self, ti: TaskInstance, link_name: str) -> str | None:
:param link_name: The name of the link we're looking for the URL for. Should be
one of the options specified in ``extra_links``.
"""
link = self.operator_extra_link_dict.get(link_name)
link = self.operator_extra_link_dict.get(name) or self.global_operator_extra_link_dict.get(name)
if not link:
link = self.global_operator_extra_link_dict.get(link_name)
if not link:
return None
return None
return link.get_link(self.unmap(None), ti_key=ti.key)

@property
Expand Down
25 changes: 25 additions & 0 deletions newsfragments/46415.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Legacy signature for operator link is removed.

``BaseOperatorLink.get_link`` used to accept execution date as an argument. This
has been changed to accept ``ti_key`` to identify a task instance instead. The
old signature, supported at runtime for compatibility, has been removed.

* Types of change

* [x] Dag changes
* [ ] Config changes
* [ ] API changes
* [ ] CLI changes
* [ ] Behaviour changes
* [ ] Plugin changes
* [ ] Dependency changes
* [x] Code interface changes

* Migration rules needed

* ruff

* AIR302

* [ ] Signature of ``airflow.models.baseoperatorlink.BaseOperatorLink.get_link`` changed
.. detailed in https://github.com/apache/airflow/pull/46415#issuecomment-2636186625
11 changes: 4 additions & 7 deletions tests/api_connexion/endpoints/test_extra_link_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,17 @@ def test_should_respond_200_support_plugins(self):
class GoogleLink(BaseOperatorLink):
name = "Google"

def get_link(self, operator, dttm):
def get_link(self, operator, *, ti_key):
return "https://www.google.com"

class S3LogLink(BaseOperatorLink):
name = "S3"
operators = [CustomOperator]

def get_link(self, operator, dttm):
def get_link(self, operator, *, ti_key):
return (
f"https://s3.amazonaws.com/airflow-logs/{operator.dag_id}/"
f"{operator.task_id}/{quote_plus(dttm.isoformat())}"
f"{operator.task_id}/{quote_plus(ti_key.run_id)}"
)

class AirflowTestPlugin(AirflowPlugin):
Expand All @@ -241,8 +241,5 @@ class AirflowTestPlugin(AirflowPlugin):
assert response.json == {
"Google Custom": None,
"Google": "https://www.google.com",
"S3": (
"https://s3.amazonaws.com/airflow-logs/"
"TEST_DAG_ID/TEST_SINGLE_LINK/2020-01-01T00%3A00%3A00%2B00%3A00"
),
"S3": "https://s3.amazonaws.com/airflow-logs/TEST_DAG_ID/TEST_SINGLE_LINK/TEST_DAG_RUN_ID",
}
Loading