-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.6.1
What happened
When doing .partial() and .expand() on any operator which has an instance property (e.g. @property) of operator_extra_links the MappedOperator does not handle it properly, causing the dag to fail to import.
The BatchOperator from airflow.providers.amazon.aws.operators.batch is one example of an operator which defines operator_extra_links on a per-instance basis.
What you think should happen instead
The dag should not fail to import (especially when using the AWS BatchOperator!) Either:
- If per-instance
operator_extra_linksis deemed disallowed behaviourMappedOperatorshould detect it's a property and give a more helpful error messageBatchOperatorfrom the AWS provider should be changed. If I need to open another ticket elsewhere for that please let me know
- If per-instance
operator_extra_linksis allowedMappedOperatorneeds to be adjusted to account for that
How to reproduce
import pendulum
from airflow.models.baseoperator import BaseOperator
from airflow.decorators import dag, task
class BadOperator(BaseOperator):
def __init__(self, *args, some_argument: str, **kwargs):
super().__init__(*args, some_argument, **kwargs)
@property
def operator_extra_links(self):
# !PROBLEMATIC FUNCTION!
# ... Some code to create a collection of `BaseOperatorLink`s dynamically
return tuple()
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"]
)
def airflow_is_bad():
"""
Example to demonstrate issue with airflow API
"""
@task
def create_arguments():
return [1,2,3,4]
bad_operator_test_group = BadOperator.partial(
task_id="bad_operator_test_group",
).expand(some_argument=create_arguments())
dag = airflow_is_bad()
Put this in your dags folder, Airflow will fail to import the dag with error
Broken DAG: [<USER>/airflow/dags/airflow_is_bad_minimal_example.py] Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 825, in _serialize_node
serialize_op["_operator_extra_links"] = cls._serialize_operator_extra_links(
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1165, in _serialize_operator_extra_links
for operator_extra_link in operator_extra_links:
TypeError: 'property' object is not iterable
Commenting out the operator_extra_links from the BadOperator in the example will allow the dag to be imported fine
Operating System
macOS Ventura 13.4
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==8.0.0
apache-airflow-providers-common-sql==1.4.0
apache-airflow-providers-ftp==3.3.1
apache-airflow-providers-google==10.0.0
apache-airflow-providers-http==4.3.0
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-sqlite==3.3.2
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else
I found adjusting operator_extra_links on BatchOperator to be operator_extra_links = (BatchJobDetailsLink(), BatchJobDefinitionLink(), BatchJobQueueLink(), CloudWatchEventsLink()) solved my issue and made it run fine, however I've no idea if that's safe or generalises because I'm not sure what operator_extra_links is actually for internally.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct