-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
apache-airflow-providers-google==17.1.0
Apache Airflow version
3.0.5
Operating System
Mac
Deployment
Astronomer
Deployment details
Local
What happened
I have 2 DAGs. The 2nd DAG is scheduled based on an asset event from the first DAG. The 2nd DAG has a BigQueryInsertJobOperator - however due to some changes in Airflow 3 the logical_date is not passed through the context anymore with asset triggered dagruns.
Within the BigQueryInsertJobOperator there is a block of code in the execute method that auto generates the job_id based on the logical_date. Due to logical_date no longer being present, the operator is breaking.
def execute(self, context: Any):
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
self.hook = hook
if self.project_id is None:
self.project_id = hook.project_id
self.job_id = hook.generate_job_id(
job_id=self.job_id,
dag_id=self.dag_id,
task_id=self.task_id,
logical_date=context["logical_date"],
configuration=self.configuration,
force_rerun=self.force_rerun,
)
...
What you think should happen instead
BigQueryInsertJobOperator should be able to generate a job_id regardless of whether it is triggered in a scheduled / manually triggered DAG or through an asset event.
How to reproduce
Create 2 DAGs. Add a task in the DAG triggered by an asset that checks for logical_date=context["logical_date"]. When using logical_date=context.get("logical_date"), there is no error but the value returned is None. Interestingly, this is not the case in Airflow 2, where logical_date is returned as part of the context, and reflects the run_id of the dag.
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct