-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
3.1.0
If "Other Airflow 2/3 version" selected, which one?
No response
What happened?
3.1. changed quite a bit on the SerDe side. Now there is an own SerDe implementation for pydantic models.
Unfortunately the detection of pydantic models does also detect pydantic dataclasses as pydantic models and wants to serialize them using model_dump() which does not exist on pydantic dataclasses.
The error then is:
AttributeError: 'BrokenPydanticDataClass' object has no attribute 'model_dump'
Datei "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", Zeile 931 in run
Datei "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", Zeile 1369 in _push_xcom_if_needed
Datei "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", Zeile 579 in _xcom_push
Datei "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/xcom.py", Zeile 77 in set
Datei "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/io/xcom/backend.py", Zeile 122 in serialize_value
Datei "/usr/python/lib/python3.12/json/__init__.py", Zeile 238 in dumps
Datei "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/json.py", Zeile 44 in encode
Datei "/usr/python/lib/python3.12/json/encoder.py", Zeile 200 in encode
Datei "/usr/python/lib/python3.12/json/encoder.py", Zeile 258 in iterencode
Datei "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/json.py", Zeile 31 in default
Datei "/home/airflow/.local/lib/python3.12/site-packages/airflow/serialization/serde.py", Zeile 151 in serialize
Datei "/home/airflow/.local/lib/python3.12/site-packages/airflow/serialization/serializers/pydantic.py", Zeile 49 in serialize
The detection is here:
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/serialization/typing.py#L23
Which is used here:
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/serialization/serde.py#L143
I'd be willing to help fixing it, but I don't know what the preferred way is. I can image
a) Changing the detection to check if the class is a subclass of BaseModel
b) Changing the order here https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/serialization/serde.py to check for a dataclass before the _serializer stuff
c) Changing the detection to something like return hasattr(cls, "__pydantic_fields__") and hasattr(cls, "__pydantic_validator__") and not pydantic.dataclasses.is_pydantic_dataclass(cls) https://docs.pydantic.dev/latest/api/dataclasses/#pydantic.dataclasses.is_pydantic_dataclass
What you think should happen instead?
Airflow should be able to serialize pydantic dataclasses as was the case pre 3.1.0
How to reproduce
Add this Dag to your Airflow instance
from airflow.sdk import dag, task
from datetime import datetime, timedelta
from pydantic.dataclasses import dataclass
from pipelines.dags.shared.dag_args import local_tz
@dataclass
class BrokenPydanticDataClass:
foo: str
bar: int
@dag(
dag_id="pydantic_serde_dataclass",
dag_display_name="Pydantic Dataclass Serialization Errors",
schedule=timedelta(seconds=30),
start_date=datetime(2025, 6, 27, 0, 0, 0, tzinfo=local_tz),
catchup=False,
description="1.0.0",
)
def pydantic_serde_dataclass() -> None:
@task
def return_pydantic_dataclass():
return BrokenPydanticDataClass(foo="hello", bar=1)
@task
def consume_pydantic_dataclass(model: BrokenPydanticDataClass):
print(model)
consume_pydantic_dataclass(return_pydantic_dataclass())
pydantic_serde_dataclass()Operating System
debian
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct