Skip to content

Lead the OTEL cleanup project #43789

@dstandish

Description

@dstandish

OTEL code has increased obfuscation in the airflow codebase.

It's made it harder to follow non-OTEL code (maybe call it "operative" code). And the volume of it also made it harder to evaluate the OTEL code.

There are two high level things we need to do.

  1. we need to reduce the obfuscation from the OTEL code. That is, reduce the OTEL-related noise in the scheduler and all other areas of airflow code where it is used.
    • Here's one small example of how the situation can be improved: Extract OTEL span set attrs logic in one place in the scheduler #43787
    • we need to increase the separation between what should be operative code and non-operative code. it should be easy to identify which parts of a function are material to the operation of the function, and what is "just OTEL" or "just logging and OTEL". example below.
  2. We need to do a thorough review of all the OTEL code to identify and root out bad behavior.

The first part of this should help with the second. And progress on 2 can be made while working on 1.

Another example:

Observe here, in the middle of a critical scheduler function, we have 60 lines that do nothing but log and do OTEL stuff:

if self._state == DagRunState.FAILED or self._state == DagRunState.SUCCESS:
msg = (
"DagRun Finished: dag_id=%s, execution_date=%s, run_id=%s, "
"run_start_date=%s, run_end_date=%s, run_duration=%s, "
"state=%s, external_trigger=%s, run_type=%s, "
"data_interval_start=%s, data_interval_end=%s, dag_hash=%s"
)
self.log.info(
msg,
self.dag_id,
self.execution_date,
self.run_id,
self.start_date,
self.end_date,
(
(self.end_date - self.start_date).total_seconds()
if self.start_date and self.end_date
else None
),
self._state,
self.external_trigger,
self.run_type,
self.data_interval_start,
self.data_interval_end,
self.dag_hash,
)
with Trace.start_span_from_dagrun(dagrun=self) as span:
if self._state is DagRunState.FAILED:
span.set_attribute("error", True)
attributes = {
"category": "DAG runs",
"dag_id": str(self.dag_id),
"execution_date": str(self.execution_date),
"run_id": str(self.run_id),
"queued_at": str(self.queued_at),
"run_start_date": str(self.start_date),
"run_end_date": str(self.end_date),
"run_duration": str(
(self.end_date - self.start_date).total_seconds()
if self.start_date and self.end_date
else 0
),
"state": str(self._state),
"external_trigger": str(self.external_trigger),
"run_type": str(self.run_type),
"data_interval_start": str(self.data_interval_start),
"data_interval_end": str(self.data_interval_end),
"dag_hash": str(self.dag_hash),
"conf": str(self.conf),
}
if span.is_recording():
span.add_event(name="queued", timestamp=datetime_to_nano(self.queued_at))
span.add_event(name="started", timestamp=datetime_to_nano(self.start_date))
span.add_event(name="ended", timestamp=datetime_to_nano(self.end_date))
span.set_attributes(attributes)
session.flush()

It's highly obfuscatory. And, it turns out, while if block looks like it must be a meaningful and consequential part of the function, it actually does nothing operative. (though it's very hard to see that)

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.

Metadata

Metadata

Assignees

Labels

kind:metaHigh-level information important to the communitytelemetryTelemetry-related issues

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions