Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import Annotated
from uuid import UUID

from fastapi import HTTPException, Query, status
from fastapi import Query, status
from sqlalchemy import select

from airflow.api_fastapi.common.db.common import SessionDep
Expand All @@ -39,7 +39,8 @@
@router.get("/{task_instance_id}/start_date")
def get_start_date(
task_instance_id: UUID, session: SessionDep, try_number: Annotated[int, Query()] = 1
) -> UtcDateTime:
) -> UtcDateTime | None:
"""Get the first reschedule date if found, None if no records exist."""
start_date = session.scalar(
select(TaskReschedule)
.where(
Expand All @@ -50,7 +51,5 @@ def get_start_date(
.with_only_columns(TaskReschedule.start_date)
.limit(1)
)
if start_date is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)

return start_date
10 changes: 0 additions & 10 deletions airflow-core/src/airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,16 +386,6 @@ def pre_execute(self, context: Any):
logger=self.log,
).run(context)

def execute(self, context: Context) -> Any:
"""
Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.
"""
raise NotImplementedError()

@apply_lineage
def post_execute(self, context: Any, result: Any = None):
"""
Expand Down
10 changes: 10 additions & 0 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2324,6 +2324,16 @@ def get_failed_dep_statuses(self, dep_context: DepContext | None = None, session
if TYPE_CHECKING:
assert isinstance(self.task, BaseOperator)

if not hasattr(self.task, "deps"):
# These deps are not on BaseOperator since they are only needed and evaluated
# in the scheduler and not needed at the Runtime.
from airflow.serialization.serialized_objects import SerializedBaseOperator

serialized_op = SerializedBaseOperator.deserialize_operator(
SerializedBaseOperator.serialize_operator(self.task)
)
setattr(self.task, "deps", serialized_op.deps) # type: ignore[union-attr]

dep_context = dep_context or DepContext()
for dep in dep_context.deps | self.task.deps:
for dep_status in dep.get_dep_statuses(self, session, dep_context):
Expand Down
24 changes: 0 additions & 24 deletions airflow-core/src/airflow/sensors/README.md

This file was deleted.

1 change: 1 addition & 0 deletions airflow-core/src/airflow/sensors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from airflow.utils.deprecation_tools import add_deprecated_classes

# TODO: Add definition from Task SDK here and remove `base.py` file
__deprecated_classes = {
"python":{
"PythonSensor": "airflow.providers.standard.sensors.python.PythonSensor",
Expand Down
Loading