File tree Expand file tree Collapse file tree 2 files changed +18
-3
lines changed
dbt/cloud/src/airflow/providers/dbt/cloud/utils
snowflake/src/airflow/providers/snowflake/utils Expand file tree Collapse file tree 2 files changed +18
-3
lines changed Original file line number Diff line number Diff line change @@ -48,6 +48,14 @@ def _get_logical_date(task_instance):
4848 return date
4949
5050
51+ def _get_dag_run_clear_number (task_instance ):
52+ # todo: remove when min airflow version >= 3.0
53+ if AIRFLOW_V_3_0_PLUS :
54+ dagrun = task_instance .get_template_context ()["dag_run" ]
55+ return dagrun .clear_number
56+ return task_instance .dag_run .clear_number
57+
58+
5159@require_openlineage_version (provider_min_version = "2.3.0" )
5260def generate_openlineage_events_from_dbt_cloud_run (
5361 operator : DbtCloudRunJobOperator | DbtCloudJobRunSensor , task_instance : TaskInstance
@@ -144,7 +152,7 @@ async def get_artifacts_for_steps(steps, artifacts):
144152 root_parent_run_id = OpenLineageAdapter .build_dag_run_id (
145153 dag_id = task_instance .dag_id ,
146154 logical_date = _get_logical_date (task_instance ),
147- clear_number = task_instance . dag_run . clear_number ,
155+ clear_number = _get_dag_run_clear_number ( task_instance ) ,
148156 )
149157
150158 parent_job = ParentRunMetadata (
Original file line number Diff line number Diff line change @@ -110,9 +110,16 @@ def _get_logical_date(task_instance):
110110
111111 return date
112112
113- # todo: move this run_id logic into OpenLineage's listener to avoid differences
114113
114+ def _get_dag_run_clear_number (task_instance ):
115+ # todo: remove when min airflow version >= 3.0
116+ if AIRFLOW_V_3_0_PLUS :
117+ dagrun = task_instance .get_template_context ()["dag_run" ]
118+ return dagrun .clear_number
119+ return task_instance .dag_run .clear_number
115120
121+
122+ # todo: move this run_id logic into OpenLineage's listener to avoid differences
116123def _get_ol_run_id (task_instance ) -> str :
117124 """
118125 Get OpenLineage run_id from TaskInstance.
@@ -140,7 +147,7 @@ def _get_ol_dag_run_id(task_instance) -> str:
140147 return OpenLineageAdapter .build_dag_run_id (
141148 dag_id = task_instance .dag_id ,
142149 logical_date = _get_logical_date (task_instance ),
143- clear_number = task_instance . dag_run . clear_number ,
150+ clear_number = _get_dag_run_clear_number ( task_instance ) ,
144151 )
145152
146153
You can’t perform that action at this time.
0 commit comments