Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ def _create_dag(self, dag_maker):
CustomOperator(
task_id=self.task_multiple_links, bash_command=["TEST_LINK_VALUE_1", "TEST_LINK_VALUE_2"]
)
_ = CustomOperator.partial(task_id=self.task_mapped).expand(
bash_command=["TEST_LINK_VALUE_1", "TEST_LINK_VALUE_2"]
)
return dag

@pytest.mark.parametrize(
Expand Down Expand Up @@ -261,29 +264,37 @@ def test_should_respond_200_support_plugins(self, test_client):
).model_dump()
)

@pytest.mark.xfail(reason="TODO: TaskSDK need to fix this, Extra links should work for mapped operator")
def test_should_respond_200_mapped_task_instance(self, test_client):
map_index = 0
XCom.set(
key="search_query",
value="TEST_LINK_VALUE_1",
task_id=self.task_mapped,
dag_id=self.dag.dag_id,
run_id=self.dag_run_id,
map_index=map_index,
)
response = test_client.get(
f"/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_mapped}/links",
params={"map_index": map_index},
)
assert response.status_code == 200
assert (
response.json()
== ExtraLinkCollectionResponse(
extra_links={"Google Custom": "http://google.com/custom_base_link?search=TEST_LINK_VALUE_1"},
total_entries=1,
).model_dump()
)
def test_should_respond_200_mapped_task_instance(self, test_client, session):
for map_index, value in enumerate(["TEST_LINK_VALUE_1", "TEST_LINK_VALUE_2"]):
XCom.set(
key="search_query",
value=value,
task_id=self.task_mapped,
dag_id=self.dag_id,
run_id=self.dag_run_id,
map_index=map_index,
)
XCom.set(
key="_link_CustomOpLink",
value=f"http://google.com/custom_base_link?search={value}",
task_id=self.task_mapped,
dag_id=self.dag_id,
run_id=self.dag_run_id,
map_index=map_index,
)
session.commit()
response = test_client.get(
f"/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_mapped}/links",
params={"map_index": map_index},
)
assert response.status_code == 200
assert (
response.json()
== ExtraLinkCollectionResponse(
extra_links={"Google Custom": f"http://google.com/custom_base_link?search={value}"},
total_entries=1,
).model_dump()
)

def test_should_respond_401_unauthenticated(self, unauthenticated_test_client):
response = unauthenticated_test_client.get(
Expand All @@ -305,4 +316,4 @@ def test_should_respond_404_invalid_map_index(self, test_client):
params={"map_index": 4},
)
assert response.status_code == 404
assert response.json() == {"detail": "Task with ID = TEST_MAPPED_TASK not found"}
assert response.json() == {"detail": "TaskInstance not found"}
7 changes: 7 additions & 0 deletions devel-common/src/tests_common/test_utils/mock_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,15 @@ class CustomOperator(BaseOperator):
@property
def operator_extra_links(self):
"""Return operator extra links."""
# For mapped operators
if not hasattr(self, "bash_command"):
# For mapped operators, we return CustomOpLink since each mapped instance
# will get its own link during runtime
return (CustomOpLink(),)
# For non-mapped operators
if isinstance(self.bash_command, str) or self.bash_command is None:
return (CustomOpLink(),)
# For operators with multiple commands
return (CustomBaseIndexOpLink(i) for i, _ in enumerate(self.bash_command))

def __init__(self, bash_command=None, **kwargs):
Expand Down