@@ -441,8 +441,11 @@ def test_trigger_dagrun_pushes_dag_id_to_xcom(self, dag_maker, session):
441441 run_id_xcom = triggering_ti .xcom_pull (key = XCOM_RUN_ID )
442442 assert run_id_xcom == "test_run_id"
443443
444- def test_extra_operator_link (self , dag_maker , session ):
444+ @mock .patch ("airflow.providers.standard.operators.trigger_dagrun.XCom.get_value" )
445+ def test_extra_operator_link (self , mock_xcom_get_value , dag_maker , session ):
445446 """Asserts whether the correct extra links url will be created."""
447+ from airflow .providers .standard .operators .trigger_dagrun import XCOM_RUN_ID
448+
446449 with dag_maker (TEST_DAG_ID , default_args = {"start_date" : DEFAULT_DATE }, serialized = True ):
447450 task = TriggerDagRunOperator (
448451 task_id = "test_task" , trigger_dag_id = TRIGGERED_DAG_ID , trigger_run_id = "test_run_id"
@@ -452,6 +455,14 @@ def test_extra_operator_link(self, dag_maker, session):
452455
453456 triggering_ti = session .query (TaskInstance ).filter_by (task_id = task .task_id , dag_id = task .dag_id ).one ()
454457
458+ # Mock XCom.get_value to return None for dag_id but return run_id for XCOM_RUN_ID
459+ def mock_get_value (ti_key , key ):
460+ if key == XCOM_RUN_ID :
461+ return "test_run_id"
462+ return None
463+
464+ mock_xcom_get_value .side_effect = mock_get_value
465+
455466 with mock .patch ("airflow.utils.helpers.build_airflow_url_with_query" ) as mock_build_url :
456467 # This is equivalent of a task run calling this and pushing to xcom
457468 task .operator_extra_links [0 ].get_link (operator = task , ti_key = triggering_ti .key )
@@ -463,7 +474,8 @@ def test_extra_operator_link(self, dag_maker, session):
463474 }
464475 assert expected_args in args
465476
466- def test_extra_operator_link_with_dynamic_dag_id (self , dag_maker , session ):
477+ @mock .patch ("airflow.providers.standard.operators.trigger_dagrun.XCom.get_value" )
478+ def test_extra_operator_link_with_dynamic_dag_id (self , mock_xcom_get_value , dag_maker , session ):
467479 """Test that operator link works correctly when dag_id is dynamically resolved from XCom."""
468480 from airflow .providers .standard .operators .trigger_dagrun import XCOM_DAG_ID , XCOM_RUN_ID
469481
@@ -479,8 +491,15 @@ def test_extra_operator_link_with_dynamic_dag_id(self, dag_maker, session):
479491
480492 triggering_ti = session .query (TaskInstance ).filter_by (task_id = task .task_id , dag_id = task .dag_id ).one ()
481493
482- # Simulate that the operator has pushed a dynamic dag_id to XCom
483- triggering_ti .xcom_push (key = XCOM_DAG_ID , value = "dynamic_dag_id" )
494+ # Mock XCom.get_value to return our test values
495+ def mock_get_value (ti_key , key ):
496+ if key == XCOM_DAG_ID :
497+ return "dynamic_dag_id"
498+ elif key == XCOM_RUN_ID :
499+ return "test_run_id"
500+ return None
501+
502+ mock_xcom_get_value .side_effect = mock_get_value
484503
485504 with mock .patch ("airflow.utils.helpers.build_airflow_url_with_query" ) as mock_build_url :
486505 task .operator_extra_links [0 ].get_link (operator = task , ti_key = triggering_ti .key )
0 commit comments