Skip to content

MappedOperator doesn't allow operator_extra_links instance property #31902

@harrisonharris-di

Description

@harrisonharris-di

Apache Airflow version

2.6.1

What happened

When doing .partial() and .expand() on any operator which has an instance property (e.g. @property) of operator_extra_links the MappedOperator does not handle it properly, causing the dag to fail to import.

The BatchOperator from airflow.providers.amazon.aws.operators.batch is one example of an operator which defines operator_extra_links on a per-instance basis.

What you think should happen instead

The dag should not fail to import (especially when using the AWS BatchOperator!) Either:

  • If per-instance operator_extra_links is deemed disallowed behaviour
    • MappedOperator should detect it's a property and give a more helpful error message
    • BatchOperator from the AWS provider should be changed. If I need to open another ticket elsewhere for that please let me know
  • If per-instance operator_extra_links is allowed
    • MappedOperator needs to be adjusted to account for that

How to reproduce

import pendulum
from airflow.models.baseoperator import BaseOperator
from airflow.decorators import dag, task
class BadOperator(BaseOperator):
    def __init__(self, *args, some_argument: str, **kwargs):
        super().__init__(*args, some_argument, **kwargs)

    @property
    def operator_extra_links(self):
        # !PROBLEMATIC FUNCTION!
        # ... Some code to create a collection of `BaseOperatorLink`s dynamically
        return tuple()

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"]
)
def airflow_is_bad():
    """
    Example to demonstrate issue with airflow API
    """

    @task
    def create_arguments():
        return [1,2,3,4]

    bad_operator_test_group = BadOperator.partial(
        task_id="bad_operator_test_group",
    ).expand(some_argument=create_arguments())

dag = airflow_is_bad()

Put this in your dags folder, Airflow will fail to import the dag with error

Broken DAG: [<USER>/airflow/dags/airflow_is_bad_minimal_example.py] Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 825, in _serialize_node
    serialize_op["_operator_extra_links"] = cls._serialize_operator_extra_links(
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1165, in _serialize_operator_extra_links
    for operator_extra_link in operator_extra_links:
TypeError: 'property' object is not iterable

Commenting out the operator_extra_links from the BadOperator in the example will allow the dag to be imported fine

Operating System

macOS Ventura 13.4

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.0.0
apache-airflow-providers-common-sql==1.4.0
apache-airflow-providers-ftp==3.3.1
apache-airflow-providers-google==10.0.0
apache-airflow-providers-http==4.3.0
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-sqlite==3.3.2

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

I found adjusting operator_extra_links on BatchOperator to be operator_extra_links = (BatchJobDetailsLink(), BatchJobDefinitionLink(), BatchJobQueueLink(), CloudWatchEventsLink()) solved my issue and made it run fine, however I've no idea if that's safe or generalises because I'm not sure what operator_extra_links is actually for internally.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions