Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class TIDeferredStatePayload(StrictBaseModel):
trigger_timeout: timedelta | None = None
next_method: str
"""The name of the method on the operator to call in the worker after the trigger has fired."""
next_kwargs: Annotated[dict[str, Any] | str, Field(default_factory=dict)]
next_kwargs: Annotated[dict[str, Any], Field(default_factory=dict)]
"""
Kwargs to pass to the above method, either a plain dict or an encrypted string.
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,9 @@ def password(cls):
"""Password. The value is decrypted/encrypted when reading/setting the value."""
return synonym("_password", descriptor=property(cls.get_password, cls.set_password))

def get_extra(self) -> str:
def get_extra(self) -> str | None:
"""Return encrypted extra-data."""
extra_val: str | None
if self._extra and self.is_extra_encrypted:
fernet = get_fernet()
if not fernet.is_encrypted:
Expand Down
7 changes: 4 additions & 3 deletions airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ def get_run_data_interval(timetable: Timetable, run: DagRun) -> DataInterval:

# Compatibility: runs created before AIP-39 implementation don't have an
# explicit data interval. Try to infer from the logical date.
if TYPE_CHECKING:
assert run.logical_date is not None
return infer_automated_data_interval(timetable, run.logical_date)


Expand Down Expand Up @@ -521,14 +523,13 @@ def get_paused_dag_ids(dag_ids: list[str], session: Session = NEW_SESSION) -> se
:param session: ORM Session
:return: Paused Dag_ids
"""
paused_dag_ids = session.execute(
paused_dag_ids = session.scalars(
select(DagModel.dag_id)
.where(DagModel.is_paused == expression.true())
.where(DagModel.dag_id.in_(dag_ids))
)

paused_dag_ids = {paused_dag_id for (paused_dag_id,) in paused_dag_ids}
return paused_dag_ids
return set(paused_dag_ids)

@property
def safe_dag_id(self):
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def __init__(self, dag: LazyDeserializedDAG) -> None:

# serve as cache so no need to decompress and load, when accessing data field
# when COMPRESS_SERIALIZED_DAGS is True
self.__data_cache = dag_data
self.__data_cache: dict[Any, Any] | None = dag_data

def __repr__(self) -> str:
return f"<SerializedDag: {self.dag_id}>"
Expand Down
4 changes: 1 addition & 3 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,7 @@ class TaskInstance(Base, LoggingMixin):
# The method to call next, and any extra arguments to pass to it.
# Usually used when resuming from DEFERRED.
next_method: Mapped[str | None] = mapped_column(String(1000), nullable=True)
next_kwargs: Mapped[dict | str | None] = mapped_column(
MutableDict.as_mutable(ExtendedJSON), nullable=True
)
next_kwargs: Mapped[dict | None] = mapped_column(MutableDict.as_mutable(ExtendedJSON), nullable=True)

_task_display_property_value: Mapped[str | None] = mapped_column(
"task_display_name", String(2000), nullable=True
Expand Down
2 changes: 1 addition & 1 deletion task-sdk/src/airflow/sdk/api/datamodels/_generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class TIDeferredStatePayload(BaseModel):
trigger_kwargs: Annotated[dict[str, Any] | str | None, Field(title="Trigger Kwargs")] = None
trigger_timeout: Annotated[timedelta | None, Field(title="Trigger Timeout")] = None
next_method: Annotated[str, Field(title="Next Method")]
next_kwargs: Annotated[dict[str, Any] | str | None, Field(title="Next Kwargs")] = None
next_kwargs: Annotated[dict[str, Any] | None, Field(title="Next Kwargs")] = None
rendered_map_index: Annotated[str | None, Field(title="Rendered Map Index")] = None


Expand Down
Loading