-
Notifications
You must be signed in to change notification settings - Fork 16.4k
AIP-51 - Executor Coupling in Logging #28161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6a67d03
ac8b149
d02287f
ec66e41
baf3361
4d2ba5e
0eeb2a0
e1a6851
828ce16
20ac872
78032ca
0efc192
3007b13
48240fa
7eb6613
c2b19ff
674831b
52a714a
52f08c2
72c855e
b5ed7f7
300badf
367faec
02b0170
9641385
a7e267b
9ffb169
b2f0aef
178b56f
52172fa
2d8148c
f400098
6366087
3937816
a8e2bef
1be69b1
d5b7196
07dd5e4
31ae849
9119b9d
71ea32f
89a514b
f31b2b3
1e385ac
5d669f5
8c6f26f
0c8943e
7d317e5
2ae5144
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||
| import multiprocessing | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| import time | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| from collections import defaultdict | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| from contextlib import suppress | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| from datetime import timedelta | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| from queue import Empty, Queue | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Tuple | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -37,6 +38,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||
| from kubernetes.client.rest import ApiException | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| from urllib3.exceptions import ReadTimeoutError | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airflow.configuration import conf | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airflow.exceptions import AirflowException, PodMutationHookException, PodReconciliationError | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airflow.executors.base_executor import BaseExecutor, CommandType | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airflow.kubernetes import pod_generator | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -773,6 +775,57 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na | |||||||||||||||||||||||||||||||||||||||||||||||||||
| # do this once, so only do it when we remove the task from running | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.event_buffer[key] = state, None | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| @staticmethod | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _get_pod_namespace(ti: TaskInstance): | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| pod_override = ti.executor_config.get("pod_override") | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| namespace = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| with suppress(Exception): | ||||||||||||||||||||||||||||||||||||||||||||||||||||
o-nikolas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| namespace = pod_override.metadata.namespace | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return namespace or conf.get("kubernetes_executor", "namespace", fallback="default") | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict[str, bool]]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airflow.kubernetes.pod_generator import PodGenerator | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| client = get_kube_client() | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n" | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| selector = PodGenerator.build_selector_for_k8s_executor_pod( | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| dag_id=ti.dag_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| task_id=ti.task_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| try_number=ti.try_number, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| map_index=ti.map_index, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| run_id=ti.run_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| airflow_worker=ti.queued_by_job_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| namespace = self._get_pod_namespace(ti) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| pod_list = client.list_namespaced_pod( | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| namespace=namespace, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| label_selector=selector, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| ).items | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not pod_list: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise RuntimeError("Cannot find pod for ti %s", ti) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| elif len(pod_list) > 1: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise RuntimeError("Found multiple pods for ti %s: %s", ti, pod_list) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+807
to
+810
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really part of this PR but feels like the right place to ask. Why do we raise these exceptions and not write the issue to the log and return it? (Like lines 285-287) I wonder if this is the reason users sometimes don't see the task log and it makes them harder to find the root cause like in #29025 ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @eladkal i think, #29025 is more about the error that we log around these part. airflow/airflow/executors/kubernetes_executor.py Lines 690 to 714 in 1e385ac
These logs i believe are part of the scheduler logs and won't be visible as part of the task's log since we only fetch the logs from task's k8s pod in regarding the exceptions, am not sure if i understand you correctly, but i think, those exceptions are caught by the enclosing try/except and returned to the user. |
||||||||||||||||||||||||||||||||||||||||||||||||||||
| res = client.read_namespaced_pod_log( | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| name=pod_list[0].metadata.name, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Checking this part of code: why do we need to do the works above to get the pod name? The
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not code that is new to this PR. It was just moved to a different location. If you see the |
||||||||||||||||||||||||||||||||||||||||||||||||||||
| namespace=namespace, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| container="base", | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| follow=False, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| tail_lines=100, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| _preload_content=False, | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| for line in res: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| log += line.decode() | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| return log | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as f: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n" | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return log, {"end_of_log": True} | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id] | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| scheduler_job_ids = {ti.queued_by_job_id for ti in tis} | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.