Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
786a7da
replace ti.key with ti.try_id in active_spans
xBis7 Apr 6, 2025
e234bbf
add otel to integrations
xBis7 Apr 6, 2025
92682be
modify pytest_plugin.py
xBis7 Apr 6, 2025
33c28f7
add redis integration to global_constants.py
xBis7 Apr 7, 2025
a3ef753
fix ci failures
xBis7 Apr 7, 2025
4fbda4b
fix test_edge_command.py
xBis7 Apr 8, 2025
9e71f2f
fix test_edge_executor.py
xBis7 Apr 8, 2025
d21af67
increase test timeout
xBis7 Apr 8, 2025
5a4c246
increase test timeout
xBis7 Apr 9, 2025
d65196f
mark flaky tests with xfail
xBis7 Apr 9, 2025
a287246
add config for disabling spans from internal operations + disable dec…
xBis7 Apr 9, 2025
9b5d168
disable all debugging traces
xBis7 Apr 11, 2025
81d9d85
keep redis env variables
xBis7 Apr 13, 2025
cadeb6b
merge with main and resolve conflicts
xBis7 Apr 13, 2025
53d3de4
replace ti.try_id with ti.id
xBis7 Apr 13, 2025
cc8d1fc
fix _end_active_spans logic bug causing tests to fail
xBis7 Apr 14, 2025
6901de8
set a timeout for each test in test_otel.py
xBis7 Apr 15, 2025
472d416
Merge remote-tracking branch 'origin/main' into otel_cleanup
xBis7 Apr 15, 2025
b3bcae7
add 'timeout' configuration option in pytest 'markers'
xBis7 Apr 15, 2025
5b355e0
Merge remote-tracking branch 'origin/main' into otel_cleanup
xBis7 Apr 15, 2025
221ab47
Merge remote-tracking branch 'origin/main' into otel_cleanup
xBis7 Apr 15, 2025
f5ec1f4
Merge remote-tracking branch 'origin/main' into otel_cleanup
xBis7 Apr 16, 2025
a78cf91
Merge remote-tracking branch 'origin/main' into otel_cleanup
xBis7 Apr 17, 2025
e08543b
merge with main and resolve conflicts
xBis7 Apr 24, 2025
6b672d6
Merge remote-tracking branch 'origin/main' into otel_cleanup
xBis7 Apr 28, 2025
35f93bc
update config target version
xBis7 Apr 28, 2025
248af6b
convert leftover ti.id to str
xBis7 Apr 28, 2025
d90d29e
simplify TaskInstance query
xBis7 Apr 28, 2025
74ebb42
replace dag_run.run_id with dag_run.id
xBis7 Apr 28, 2025
3fee0b0
remove conf.add_section from test_otel_tracer.py
xBis7 Apr 28, 2025
39abf69
increase timeout for each test in test_otel.py
xBis7 Apr 28, 2025
e511c16
replace class annotation with method annotation for a timeout in test…
xBis7 Apr 29, 2025
4769ac9
rename add_span to add_debug_span
xBis7 Apr 29, 2025
afcc997
initialize db only once at the start, in test_otel.py
xBis7 Apr 30, 2025
71280ce
Merge remote-tracking branch 'origin/main' into otel_cleanup
xBis7 May 1, 2025
e5e91d0
cleanup control_file in case of failure
xBis7 May 1, 2025
e27feb1
merge with main and resolve conflicts
xBis7 May 7, 2025
2c86006
convert log info to debug
xBis7 May 8, 2025
1a9d0d7
merge with main and resolve conflicts
xBis7 May 24, 2025
ce04e86
merge with main and resolve conflicts
xBis7 Jun 19, 2025
6f0fcfa
improvements in tracer.py
xBis7 Jun 19, 2025
ec93794
merge with main and resolve conflicts
xBis7 Jul 2, 2025
ae56c92
trigger CI
xBis7 Jul 3, 2025
f831209
trigger CI again
xBis7 Jul 3, 2025
d594f1c
trigger CI again
xBis7 Jul 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,13 @@ traces:
type: boolean
example: ~
default: "False"
otel_debug_traces_on:
description: |
If True, then traces from Airflow internal methods are exported. Defaults to False.
version_added: 3.1.0
type: string
example: ~
default: "False"
secrets:
description: ~
options:
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from airflow.sdk import SecretCache
from airflow.sdk.log import init_log_file, logging_processors
from airflow.stats import Stats
from airflow.traces.tracer import Trace
from airflow.traces.tracer import DebugTrace
from airflow.utils import timezone
from airflow.utils.file import list_py_file_paths, might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -1080,7 +1080,7 @@ def emit_metrics(self):
This is called once every time around the parsing "loop" - i.e. after
all files have been parsed.
"""
with Trace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span:
with DebugTrace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span:
parse_time = time.perf_counter() - self._parsing_start_time
Stats.gauge("dag_processing.total_parse_time", parse_time)
Stats.gauge("dagbag_size", sum(stat.num_dags for stat in self._file_stats.values()))
Expand Down
16 changes: 8 additions & 8 deletions airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from airflow.models import Log
from airflow.stats import Stats
from airflow.traces import NO_TRACE_ID
from airflow.traces.tracer import Trace, add_span, gen_context
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span, gen_context
from airflow.traces.utils import gen_span_id_from_ti_key
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -218,7 +218,7 @@ def sync(self) -> None:
Executors should override this to perform gather statuses.
"""

@add_span
@add_debug_span
def heartbeat(self) -> None:
"""Heartbeat sent to trigger new jobs."""
open_slots = self.parallelism - len(self.running)
Expand Down Expand Up @@ -311,7 +311,7 @@ def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, workload
reverse=True,
)

@add_span
@add_debug_span
def trigger_tasks(self, open_slots: int) -> None:
"""
Initiate async execution of the queued tasks, up to the number of available slots.
Expand Down Expand Up @@ -340,8 +340,8 @@ def trigger_tasks(self, open_slots: int) -> None:
if isinstance(item, workloads.ExecuteTask) and hasattr(item, "ti"):
ti = item.ti

# If it's None, then the span for the current TaskInstanceKey hasn't been started.
if self.active_spans is not None and self.active_spans.get(key) is None:
# If it's None, then the span for the current id hasn't been started.
if self.active_spans is not None and self.active_spans.get("ti:" + str(ti.id)) is None:
if isinstance(ti, workloads.TaskInstance):
parent_context = Trace.extract(ti.parent_context_carrier)
else:
Expand All @@ -356,7 +356,7 @@ def trigger_tasks(self, open_slots: int) -> None:
component="task",
start_as_current=False,
)
self.active_spans.set(key, span)
self.active_spans.set("ti:" + str(ti.id), span)
# Inject the current context into the carrier.
carrier = Trace.inject()
ti.context_carrier = carrier
Expand Down Expand Up @@ -397,7 +397,7 @@ def fail(self, key: TaskInstanceKey, info=None) -> None:
trace_id = Trace.get_current_span().get_span_context().trace_id
if trace_id != NO_TRACE_ID:
span_id = int(gen_span_id_from_ti_key(key, as_int=True))
with Trace.start_span(
with DebugTrace.start_span(
span_name="fail",
component="BaseExecutor",
parent_sc=gen_context(trace_id=trace_id, span_id=span_id),
Expand All @@ -424,7 +424,7 @@ def success(self, key: TaskInstanceKey, info=None) -> None:
trace_id = Trace.get_current_span().get_span_context().trace_id
if trace_id != NO_TRACE_ID:
span_id = int(gen_span_id_from_ti_key(key, as_int=True))
with Trace.start_span(
with DebugTrace.start_span(
span_name="success",
component="BaseExecutor",
parent_sc=gen_context(trace_id=trace_id, span_id=span_id),
Expand Down
1 change: 0 additions & 1 deletion airflow-core/src/airflow/executors/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ class TaskInstance(BaseModel):

parent_context_carrier: dict | None = None
context_carrier: dict | None = None
queued_dttm: datetime | None = None

# TODO: Task-SDK: Can we replace TastInstanceKey with just the uuid across the codebase?
@property
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/src/airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from airflow.listeners.listener import get_listener_manager
from airflow.models.base import ID_LEN, Base
from airflow.stats import Stats
from airflow.traces.tracer import Trace, add_span
from airflow.traces.tracer import DebugTrace, add_debug_span
from airflow.utils import timezone
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -208,7 +208,7 @@ def heartbeat(
:param session to use for saving the job
"""
previous_heartbeat = self.latest_heartbeat
with Trace.start_span(span_name="heartbeat", component="Job") as span:
with DebugTrace.start_span(span_name="heartbeat", component="Job") as span:
try:
span.set_attribute("heartbeat", str(self.latest_heartbeat))
# This will cause it to load from the db
Expand Down Expand Up @@ -393,7 +393,7 @@ def execute_job(job: Job, execute_callable: Callable[[], int | None]) -> int | N
return ret


@add_span
@add_debug_span
def perform_heartbeat(
job: Job, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool
) -> None:
Expand Down
80 changes: 41 additions & 39 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
from airflow.timetables.simple import AssetTriggeredTimetable
from airflow.traces import utils as trace_utils
from airflow.traces.tracer import Trace, add_span
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span
from airflow.utils import timezone
from airflow.utils.dates import datetime_to_nano
from airflow.utils.event_scheduler import EventScheduler
Expand Down Expand Up @@ -212,8 +212,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):

# For a dagrun span
# - key: dag_run.run_id | value: span
# - dagrun keys will be prefixed with 'dr:'.
# For a ti span
# - key: ti.key | value: span
# - key: ti.id | value: span
# - taskinstance keys will be prefixed with 'ti:'.
active_spans = ThreadSafeDict()

def __init__(
Expand Down Expand Up @@ -853,11 +855,11 @@ def process_executor_events(
ti.pid,
)

if (active_ti_span := cls.active_spans.get(ti.key)) is not None:
if (active_ti_span := cls.active_spans.get("ti:" + str(ti.id))) is not None:
cls.set_ti_span_attrs(span=active_ti_span, state=state, ti=ti)
# End the span and remove it from the active_spans dict.
active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
cls.active_spans.delete(ti.key)
cls.active_spans.delete("ti:" + str(ti.id))
ti.span_status = SpanStatus.ENDED
else:
if ti.span_status == SpanStatus.ACTIVE:
Expand Down Expand Up @@ -1036,27 +1038,23 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION)
@provide_session
def _end_active_spans(self, session: Session = NEW_SESSION):
# No need to do a commit for every update. The annotation will commit all of them once at the end.
for key, span in self.active_spans.get_all().items():
from airflow.models.taskinstance import TaskInstanceKey

if isinstance(key, TaskInstanceKey): # ti span.
# Can't compare the key directly because the try_number or the map_index might not be the same.
ti: TaskInstance = session.scalars(
select(TaskInstance).where(
TaskInstance.dag_id == key.dag_id,
TaskInstance.task_id == key.task_id,
TaskInstance.run_id == key.run_id,
)
).one()
if ti.state in State.finished:
self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
span.end(end_time=datetime_to_nano(ti.end_date))
ti.span_status = SpanStatus.ENDED
else:
span.end()
ti.span_status = SpanStatus.NEEDS_CONTINUANCE
else:
dag_run: DagRun = session.scalars(select(DagRun).where(DagRun.run_id == key)).one()
for prefixed_key, span in self.active_spans.get_all().items():
# Use partition to split on the first occurrence of ':'.
prefix, sep, key = prefixed_key.partition(":")

if prefix == "ti":
ti: TaskInstance | None = session.get(TaskInstance, key)

if ti is not None:
if ti.state in State.finished:
self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
span.end(end_time=datetime_to_nano(ti.end_date))
ti.span_status = SpanStatus.ENDED
else:
span.end()
ti.span_status = SpanStatus.NEEDS_CONTINUANCE
elif prefix == "dr":
dag_run: DagRun = session.scalars(select(DagRun).where(DagRun.id == int(key))).one()
if dag_run.state in State.finished_dr_states:
dag_run.set_dagrun_span_attrs(span=span)

Expand All @@ -1070,7 +1068,11 @@ def _end_active_spans(self, session: Session = NEW_SESSION):
span_name="current_scheduler_exited", parent_context=initial_dag_run_context
) as s:
s.set_attribute("trace_status", "needs continuance")
else:
self.log.error("Found key with unknown prefix: '%s'", prefixed_key)

# Even if there is a key with an unknown prefix, clear the dict.
# If this method has been called, the scheduler is exiting.
self.active_spans.clear()

def _end_spans_of_externally_ended_ops(self, session: Session):
Expand All @@ -1092,26 +1094,26 @@ def _end_spans_of_externally_ended_ops(self, session: Session):
).all()

for dag_run in dag_runs_should_end:
active_dagrun_span = self.active_spans.get(dag_run.run_id)
active_dagrun_span = self.active_spans.get("dr:" + str(dag_run.id))
if active_dagrun_span is not None:
if dag_run.state in State.finished_dr_states:
dag_run.set_dagrun_span_attrs(span=active_dagrun_span)

active_dagrun_span.end(end_time=datetime_to_nano(dag_run.end_date))
else:
active_dagrun_span.end()
self.active_spans.delete(dag_run.run_id)
self.active_spans.delete("dr:" + str(dag_run.id))
dag_run.span_status = SpanStatus.ENDED

for ti in tis_should_end:
active_ti_span = self.active_spans.get(ti.key)
active_ti_span = self.active_spans.get("ti:" + ti.id)
if active_ti_span is not None:
if ti.state in State.finished:
self.set_ti_span_attrs(span=active_ti_span, state=ti.state, ti=ti)
active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
else:
active_ti_span.end()
self.active_spans.delete(ti.key)
self.active_spans.delete("ti:" + ti.id)
ti.span_status = SpanStatus.ENDED

def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session: Session):
Expand Down Expand Up @@ -1146,7 +1148,7 @@ def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session
carrier = Trace.inject()
# Update the context_carrier and leave the SpanStatus as ACTIVE.
dag_run.context_carrier = carrier
self.active_spans.set(dag_run.run_id, dr_span)
self.active_spans.set("dr:" + str(dag_run.id), dr_span)

tis = dag_run.get_task_instances(session=session)

Expand All @@ -1159,7 +1161,7 @@ def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session
for ti in tis
# If it has started and there is a reference on the active_spans dict,
# then it was started by the current scheduler.
if ti.start_date is not None and self.active_spans.get(ti.key) is None
if ti.start_date is not None and self.active_spans.get("ti:" + ti.id) is None
]

dr_context = Trace.extract(dag_run.context_carrier)
Expand All @@ -1179,7 +1181,7 @@ def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session
ti.span_status = SpanStatus.ENDED
else:
ti.span_status = SpanStatus.ACTIVE
self.active_spans.set(ti.key, ti_span)
self.active_spans.set("ti:" + ti.id, ti_span)

def _run_scheduler_loop(self) -> None:
"""
Expand Down Expand Up @@ -1257,7 +1259,7 @@ def _run_scheduler_loop(self) -> None:

for loop_count in itertools.count(start=1):
with (
Trace.start_span(span_name="scheduler_job_loop", component="SchedulerJobRunner") as span,
DebugTrace.start_span(span_name="scheduler_job_loop", component="SchedulerJobRunner") as span,
Stats.timer("scheduler.scheduler_loop_duration") as timer,
):
span.set_attributes(
Expand Down Expand Up @@ -1465,7 +1467,7 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None:
for b in backfills:
b.completed_at = now

@add_span
@add_debug_span
def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -> None:
"""Create a DAG run and update the dag_model to control if/when the next DAGRun should be created."""
# Bulk Fetch DagRuns with dag_id and logical_date same
Expand Down Expand Up @@ -1657,7 +1659,7 @@ def _should_update_dag_next_dagruns(
return False
return True

@add_span
@add_debug_span
def _start_queued_dagruns(self, session: Session) -> None:
"""Find DagRuns in queued state and decide moving them to running state."""
# added all() to save runtime, otherwise query is executed more than once
Expand All @@ -1674,7 +1676,7 @@ def _start_queued_dagruns(self, session: Session) -> None:
)
active_runs_of_dags = Counter({(dag_id, br_id): num for dag_id, br_id, num in session.execute(query)})

@add_span
@add_debug_span
def _update_state(dag: DAG, dag_run: DagRun):
span = Trace.get_current_span()
span.set_attributes(
Expand Down Expand Up @@ -1796,7 +1798,7 @@ def _schedule_dag_run(
span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run, as_int=True))
links = [{"trace_id": trace_id, "span_id": span_id}]

with Trace.start_span(
with DebugTrace.start_span(
span_name="_schedule_dag_run", component="SchedulerJobRunner", links=links
) as span:
span.set_attributes(
Expand Down Expand Up @@ -1878,7 +1880,7 @@ def _schedule_dag_run(
if (
dag_run.scheduled_by_job_id is not None
and dag_run.scheduled_by_job_id != self.job.id
and self.active_spans.get(dag_run.run_id) is None
and self.active_spans.get("dr:" + str(dag_run.id)) is None
):
# If the dag_run has been previously scheduled by another job and there is no active span,
# then check if the job is still healthy.
Expand Down Expand Up @@ -2091,7 +2093,7 @@ def _emit_running_ti_metrics(self, session: Session = NEW_SESSION) -> None:
def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
from airflow.models.pool import Pool

with Trace.start_span(span_name="emit_pool_metrics", component="SchedulerJobRunner") as span:
with DebugTrace.start_span(span_name="emit_pool_metrics", component="SchedulerJobRunner") as span:
pools = Pool.slots_stats(session=session)
for pool_name, slot_stats in pools.items():
Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"])
Expand Down
12 changes: 6 additions & 6 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
)
from airflow.sdk.execution_time.supervisor import WatchedSubprocess, make_buffered_socket_reader
from airflow.stats import Stats
from airflow.traces.tracer import Trace, add_span
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span
from airflow.triggers import base as events
from airflow.utils import timezone
from airflow.utils.helpers import log_filename_template_renderer
Expand Down Expand Up @@ -463,7 +463,7 @@ def run(self) -> None:
if not self.is_alive():
log.error("Trigger runner process has died! Exiting.")
break
with Trace.start_span(span_name="triggerer_job_loop", component="TriggererJobRunner"):
with DebugTrace.start_span(span_name="triggerer_job_loop", component="TriggererJobRunner"):
self.load_triggers()

# Wait for up to 1 second for activity
Expand All @@ -482,14 +482,14 @@ def heartbeat(self):
def heartbeat_callback(self, session: Session | None = None) -> None:
Stats.incr("triggerer_heartbeat", 1, 1)

@add_span
@add_debug_span
def load_triggers(self):
"""Query the database for the triggers we're supposed to be running and update the runner."""
Trigger.assign_unassigned(self.job.id, self.capacity, self.health_check_threshold)
ids = Trigger.ids_for_triggerer(self.job.id)
self.update_triggers(set(ids))

@add_span
@add_debug_span
def handle_events(self):
"""Dispatch outbound events to the Trigger model which pushes them to the relevant task instances."""
while self.events:
Expand All @@ -500,12 +500,12 @@ def handle_events(self):
# Emit stat event
Stats.incr("triggers.succeeded")

@add_span
@add_debug_span
def clean_unused(self):
"""Clean out unused or finished triggers."""
Trigger.clean_unused()

@add_span
@add_debug_span
def handle_failed_triggers(self):
"""
Handle "failed" triggers. - ones that errored or exited before they sent an event.
Expand Down
Loading