-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
OTEL code has increased obfuscation in the airflow codebase.
It's made it harder to follow non-OTEL code (maybe call it "operative" code). And the volume of it also made it harder to evaluate the OTEL code.
There are two high level things we need to do.
- we need to reduce the obfuscation from the OTEL code. That is, reduce the OTEL-related noise in the scheduler and all other areas of airflow code where it is used.
- Here's one small example of how the situation can be improved: Extract OTEL span set attrs logic in one place in the scheduler #43787
- we need to increase the separation between what should be operative code and non-operative code. it should be easy to identify which parts of a function are material to the operation of the function, and what is "just OTEL" or "just logging and OTEL". example below.
- We need to do a thorough review of all the OTEL code to identify and root out bad behavior.
The first part of this should help with the second. And progress on 2 can be made while working on 1.
Another example:
Observe here, in the middle of a critical scheduler function, we have 60 lines that do nothing but log and do OTEL stuff:
airflow/airflow/models/dagrun.py
Lines 992 to 1049 in b757bd8
| if self._state == DagRunState.FAILED or self._state == DagRunState.SUCCESS: | |
| msg = ( | |
| "DagRun Finished: dag_id=%s, execution_date=%s, run_id=%s, " | |
| "run_start_date=%s, run_end_date=%s, run_duration=%s, " | |
| "state=%s, external_trigger=%s, run_type=%s, " | |
| "data_interval_start=%s, data_interval_end=%s, dag_hash=%s" | |
| ) | |
| self.log.info( | |
| msg, | |
| self.dag_id, | |
| self.execution_date, | |
| self.run_id, | |
| self.start_date, | |
| self.end_date, | |
| ( | |
| (self.end_date - self.start_date).total_seconds() | |
| if self.start_date and self.end_date | |
| else None | |
| ), | |
| self._state, | |
| self.external_trigger, | |
| self.run_type, | |
| self.data_interval_start, | |
| self.data_interval_end, | |
| self.dag_hash, | |
| ) | |
| with Trace.start_span_from_dagrun(dagrun=self) as span: | |
| if self._state is DagRunState.FAILED: | |
| span.set_attribute("error", True) | |
| attributes = { | |
| "category": "DAG runs", | |
| "dag_id": str(self.dag_id), | |
| "execution_date": str(self.execution_date), | |
| "run_id": str(self.run_id), | |
| "queued_at": str(self.queued_at), | |
| "run_start_date": str(self.start_date), | |
| "run_end_date": str(self.end_date), | |
| "run_duration": str( | |
| (self.end_date - self.start_date).total_seconds() | |
| if self.start_date and self.end_date | |
| else 0 | |
| ), | |
| "state": str(self._state), | |
| "external_trigger": str(self.external_trigger), | |
| "run_type": str(self.run_type), | |
| "data_interval_start": str(self.data_interval_start), | |
| "data_interval_end": str(self.data_interval_end), | |
| "dag_hash": str(self.dag_hash), | |
| "conf": str(self.conf), | |
| } | |
| if span.is_recording(): | |
| span.add_event(name="queued", timestamp=datetime_to_nano(self.queued_at)) | |
| span.add_event(name="started", timestamp=datetime_to_nano(self.start_date)) | |
| span.add_event(name="ended", timestamp=datetime_to_nano(self.end_date)) | |
| span.set_attributes(attributes) | |
| session.flush() |
It's highly obfuscatory. And, it turns out, while if block looks like it must be a meaningful and consequential part of the function, it actually does nothing operative. (though it's very hard to see that)
Committer
- I acknowledge that I am a maintainer/committer of the Apache Airflow project.