Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 1 addition & 23 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import os
import uuid
from collections import defaultdict
from collections.abc import Collection, Generator, Iterable
from collections.abc import Collection, Iterable
from datetime import timedelta
from functools import cache
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -156,28 +156,6 @@ def _add_log(
)


@contextlib.contextmanager
def set_current_context(context: Context) -> Generator[Context, None, None]:
"""
Set the current execution context to the provided context object.

This method should be called once per Task execution, before calling operator.execute.
"""
from airflow.sdk.definitions._internal.contextmanager import _CURRENT_CONTEXT

_CURRENT_CONTEXT.append(context)
try:
yield context
finally:
expected_state = _CURRENT_CONTEXT.pop()
if expected_state != context:
log.warning(
"Current context is not equal to the state at context stack. Expected=%s, got=%s",
context,
expected_state,
)


def _stop_remaining_tasks(*, task_instance: TaskInstance, task_teardown_map=None, session: Session):
"""
Stop non-teardown tasks in dag.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
DeserializingResultError,
)
from airflow.models.connection import Connection
from airflow.models.taskinstance import TaskInstance, clear_task_instances, set_current_context
from airflow.models.taskinstance import TaskInstance, clear_task_instances
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import (
BranchExternalPythonOperator,
Expand All @@ -74,8 +74,11 @@

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperator
from airflow.sdk.execution_time.context import set_current_context
else:
from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef]
from airflow.models.taskinstance import set_current_context # type: ignore[attr-defined,no-redef]


if TYPE_CHECKING:
from airflow.models.dag import DAG
Expand Down