From daa3f3cb59f8f7d19184e474c26bd21611806c63 Mon Sep 17 00:00:00 2001 From: Xiaodong DENG Date: Tue, 30 Mar 2021 09:36:21 +0200 Subject: [PATCH] Add test to guard how DAG/Operator params work together (#15075) --- tests/core/test_core.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/core/test_core.py b/tests/core/test_core.py index f6dd6ee579291..061a1a0dbf272 100644 --- a/tests/core/test_core.py +++ b/tests/core/test_core.py @@ -421,3 +421,40 @@ def test_externally_triggered_dagrun(self): assert context['prev_ds'] == execution_ds assert context['prev_ds_nodash'] == execution_ds_nodash + + def test_dag_params_and_task_params(self): + # This test case guards how params of DAG and Operator work together. + # - If any key exists in either DAG's or Operator's params, + # it is guaranteed to be available eventually. + # - If any key exists in both DAG's params and Operator's params, + # the latter has precedence. + TI = TaskInstance + + dag = DAG( + TEST_DAG_ID, + default_args=self.args, + schedule_interval=timedelta(weeks=1), + start_date=DEFAULT_DATE, + params={'key_1': 'value_1', 'key_2': 'value_2_old'}, + ) + task1 = DummyOperator( + task_id='task1', + dag=dag, + params={'key_2': 'value_2_new', 'key_3': 'value_3'}, + ) + task2 = DummyOperator(task_id='task2', dag=dag) + dag.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + state=State.RUNNING, + external_trigger=True, + ) + task1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + task2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + ti1 = TI(task=task1, execution_date=DEFAULT_DATE) + ti2 = TI(task=task2, execution_date=DEFAULT_DATE) + context1 = ti1.get_template_context() + context2 = ti2.get_template_context() + + assert context1['params'] == {'key_1': 'value_1', 'key_2': 'value_2_new', 'key_3': 'value_3'} + assert context2['params'] == {'key_1': 'value_1', 'key_2': 'value_2_old'}