Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airflow/serialization/pydantic/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ class DagRunPydantic(BaseModelPydantic):
class Config:
"""Make sure it deals automatically with SQLAlchemy ORM classes."""

orm_mode = True
from_attributes = True
orm_mode = True # Pydantic 1.x compatibility.
12 changes: 8 additions & 4 deletions airflow/serialization/pydantic/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class DagScheduleDatasetReferencePydantic(BaseModelPydantic):
class Config:
"""Make sure it deals automatically with SQLAlchemy ORM classes."""

orm_mode = True
from_attributes = True
orm_mode = True # Pydantic 1.x compatibility.


class TaskOutletDatasetReferencePydantic(BaseModelPydantic):
Expand All @@ -46,7 +47,8 @@ class TaskOutletDatasetReferencePydantic(BaseModelPydantic):
class Config:
"""Make sure it deals automatically with SQLAlchemy ORM classes."""

orm_mode = True
from_attributes = True
orm_mode = True # Pydantic 1.x compatibility.


class DatasetPydantic(BaseModelPydantic):
Expand All @@ -65,7 +67,8 @@ class DatasetPydantic(BaseModelPydantic):
class Config:
"""Make sure it deals automatically with SQLAlchemy ORM classes."""

orm_mode = True
from_attributes = True
orm_mode = True # Pydantic 1.x compatibility.


class DatasetEventPydantic(BaseModelPydantic):
Expand All @@ -83,4 +86,5 @@ class DatasetEventPydantic(BaseModelPydantic):
class Config:
"""Make sure it deals automatically with SQLAlchemy ORM classes."""

orm_mode = True
from_attributes = True
orm_mode = True # Pydantic 1.x compatibility.
3 changes: 2 additions & 1 deletion airflow/serialization/pydantic/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ class JobPydantic(BaseModelPydantic):
class Config:
"""Make sure it deals automatically with SQLAlchemy ORM classes."""

orm_mode = True
from_attributes = True
orm_mode = True # Pydantic 1.x compatibility.
3 changes: 2 additions & 1 deletion airflow/serialization/pydantic/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class TaskInstancePydantic(BaseModelPydantic):
class Config:
"""Make sure it deals automatically with SQLAlchemy ORM classes."""

orm_mode = True
from_attributes = True
orm_mode = True # Pydantic 1.x compatibility.

def xcom_pull(
self,
Expand Down
17 changes: 13 additions & 4 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
from airflow.utils.task_group import MappedTaskGroup, TaskGroup

if TYPE_CHECKING:
from pydantic import BaseModel

from airflow.ti_deps.deps.base_ti_dep import BaseTIDep

HAS_KUBERNETES: bool
Expand Down Expand Up @@ -479,14 +481,21 @@ def serialize(
type_=DAT.SIMPLE_TASK_INSTANCE,
)
elif use_pydantic_models and _ENABLE_AIP_44:

def _pydantic_model_dump(model_cls: type[BaseModel], var: Any) -> dict[str, Any]:
try:
return model_cls.model_validate(var).model_dump() # type: ignore[attr-defined]
except AttributeError: # Pydantic 1.x compatibility.
return model_cls.from_orm(var).dict() # type: ignore[attr-defined]

if isinstance(var, Job):
return cls._encode(JobPydantic.from_orm(var).dict(), type_=DAT.BASE_JOB)
return cls._encode(_pydantic_model_dump(JobPydantic, var), type_=DAT.BASE_JOB)
elif isinstance(var, TaskInstance):
return cls._encode(TaskInstancePydantic.from_orm(var).dict(), type_=DAT.TASK_INSTANCE)
return cls._encode(_pydantic_model_dump(TaskInstancePydantic, var), type_=DAT.TASK_INSTANCE)
elif isinstance(var, DagRun):
return cls._encode(DagRunPydantic.from_orm(var).dict(), type_=DAT.DAG_RUN)
return cls._encode(_pydantic_model_dump(DagRunPydantic, var), type_=DAT.DAG_RUN)
elif isinstance(var, Dataset):
return cls._encode(DatasetPydantic.from_orm(var).dict(), type_=DAT.DATA_SET)
return cls._encode(_pydantic_model_dump(DatasetPydantic, var), type_=DAT.DATA_SET)
else:
return cls.default_serialization(strict, var)
else:
Expand Down