Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
6a67d03
initial refactoring
Dec 6, 2022
ac8b149
reverted refactoring
Dec 9, 2022
d02287f
add get_task_log in local and celery kubernetes executor
Dec 9, 2022
ec66e41
add type hint
Dec 9, 2022
baf3361
fixing unittest
Dec 9, 2022
4d2ba5e
Refactored file_task_handler
Dec 10, 2022
0eeb2a0
Merge branch 'apache:main' into bugfix/27931-AIP-51-Executor-Coupling…
snjypl Dec 10, 2022
e1a6851
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Dec 12, 2022
828ce16
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Dec 13, 2022
20ac872
Merge branch 'apache:main' into bugfix/27931-AIP-51-Executor-Coupling…
snjypl Dec 13, 2022
78032ca
adding unittest
Dec 13, 2022
0efc192
add unittest for file task handler
Dec 13, 2022
3007b13
Merge branch 'bugfix/27931-AIP-51-Executor-Coupling-in-Logging' of gi…
Dec 13, 2022
48240fa
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Dec 13, 2022
7eb6613
Merge branch 'bugfix/27931-AIP-51-Executor-Coupling-in-Logging' of gi…
Dec 13, 2022
c2b19ff
fix failing unittest
Dec 14, 2022
674831b
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Dec 15, 2022
52a714a
Merge branch 'apache:main' into bugfix/27931-AIP-51-Executor-Coupling…
snjypl Dec 15, 2022
52f08c2
merge main
Dec 20, 2022
72c855e
Merge branch 'bugfix/27931-AIP-51-Executor-Coupling-in-Logging' of gi…
Dec 20, 2022
b5ed7f7
added doc string to get_task_log
Dec 20, 2022
300badf
fix unittest
Dec 20, 2022
367faec
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Dec 20, 2022
02b0170
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Dec 27, 2022
9641385
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Dec 28, 2022
a7e267b
fix newline spacing
Dec 29, 2022
9ffb169
Fix unittest
Dec 29, 2022
b2f0aef
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Dec 29, 2022
178b56f
fix merge conflicts
Jan 6, 2023
52172fa
fix unittest
Jan 6, 2023
2d8148c
fix unittest
Jan 6, 2023
f400098
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Jan 9, 2023
6366087
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Jan 10, 2023
3937816
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Jan 10, 2023
a8e2bef
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Jan 10, 2023
1be69b1
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Jan 10, 2023
d5b7196
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Jan 12, 2023
07dd5e4
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Jan 13, 2023
31ae849
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Jan 16, 2023
9119b9d
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Jan 17, 2023
71ea32f
fix merge conflict
snjypl Jan 17, 2023
89a514b
Merge branch 'bugfix/27931-AIP-51-Executor-Coupling-in-Logging' of gi…
snjypl Jan 17, 2023
f31b2b3
fix merge conflict
snjypl Jan 17, 2023
1e385ac
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Jan 18, 2023
5d669f5
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Jan 19, 2023
8c6f26f
Merge branch 'main' of github.com:apache/airflow into bugfix/27931-AI…
snjypl Jan 23, 2023
0c8943e
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
snjypl Jan 23, 2023
7d317e5
Merge branch 'bugfix/27931-AIP-51-Executor-Coupling-in-Logging' of gi…
snjypl Jan 23, 2023
2ae5144
removed hasattr check
snjypl Jan 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,15 @@ def execute_async(
"""
raise NotImplementedError()

def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
"""
This method can be implemented by any child class to return the task logs.

:param ti: A TaskInstance object
:param log: log str
:return: logs or tuple of logs and meta dict
"""

def end(self) -> None: # pragma: no cover
"""Wait synchronously for the previously submitted job to complete."""
raise NotImplementedError()
Expand Down
6 changes: 6 additions & 0 deletions airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ def queue_task_instance(
cfg_path=cfg_path,
)

def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
"""Fetch task log from Kubernetes executor"""
if ti.queue == self.kubernetes_executor.kubernetes_queue:
return self.kubernetes_executor.get_task_log(ti=ti, log=log)
return None

def has_task(self, task_instance: TaskInstance) -> bool:
"""
Checks if a task is either queued or running in either celery or kubernetes executor.
Expand Down
53 changes: 53 additions & 0 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import multiprocessing
import time
from collections import defaultdict
from contextlib import suppress
from datetime import timedelta
from queue import Empty, Queue
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Tuple
Expand All @@ -37,6 +38,7 @@
from kubernetes.client.rest import ApiException
from urllib3.exceptions import ReadTimeoutError

from airflow.configuration import conf
from airflow.exceptions import AirflowException, PodMutationHookException, PodReconciliationError
from airflow.executors.base_executor import BaseExecutor, CommandType
from airflow.kubernetes import pod_generator
Expand Down Expand Up @@ -773,6 +775,57 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
# do this once, so only do it when we remove the task from running
self.event_buffer[key] = state, None

@staticmethod
def _get_pod_namespace(ti: TaskInstance):
pod_override = ti.executor_config.get("pod_override")
namespace = None
with suppress(Exception):
namespace = pod_override.metadata.namespace
return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")

def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict[str, bool]]:

try:
from airflow.kubernetes.pod_generator import PodGenerator

client = get_kube_client()

log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
selector = PodGenerator.build_selector_for_k8s_executor_pod(
dag_id=ti.dag_id,
task_id=ti.task_id,
try_number=ti.try_number,
map_index=ti.map_index,
run_id=ti.run_id,
airflow_worker=ti.queued_by_job_id,
)
namespace = self._get_pod_namespace(ti)
pod_list = client.list_namespaced_pod(
namespace=namespace,
label_selector=selector,
).items
if not pod_list:
raise RuntimeError("Cannot find pod for ti %s", ti)
elif len(pod_list) > 1:
raise RuntimeError("Found multiple pods for ti %s: %s", ti, pod_list)
Comment on lines +807 to +810
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really part of this PR but feels like the right place to ask.

Why do we raise these exceptions and not write the issue to the log and return it? (Like lines 285-287)

        except Exception as f:
            log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n"
            return log, {"end_of_log": True}

I wonder if this is the reason users sometimes don't see the task log and it makes them harder to find the root cause like in #29025 ?

Copy link
Contributor Author

@snjypl snjypl Jan 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal i think, #29025 is more about the error that we log around these part.

# These codes indicate something is wrong with pod definition; otherwise we assume pod
# definition is ok, and that retrying may work
if e.status in (400, 422):
self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
key, _, _, _ = task
self.change_state(key, State.FAILED, e)
else:
self.log.warning(
"ApiException when attempting to run task, re-queueing. Reason: %r. Message: %s",
e.reason,
json.loads(e.body)["message"],
)
self.task_queue.put(task)
except PodMutationHookException as e:
key, _, _, _ = task
self.log.error(
"Pod Mutation Hook failed for the task %s. Failing task. Details: %s",
key,
e.__cause__,
)
self.fail(key, e)
finally:
self.task_queue.task_done()
except Empty:
.

These logs i believe are part of the scheduler logs and won't be visible as part of the task's log since we only fetch the logs from task's k8s pod in kubernetes_executor.get_task_log.

regarding the exceptions, am not sure if i understand you correctly, but i think, those exceptions are caught by the enclosing try/except and returned to the user.

res = client.read_namespaced_pod_log(
name=pod_list[0].metadata.name,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking this part of code: why do we need to do the works above to get the pod name? The ti.hostname is just the pod name, isn't it?

cc @o-nikolas @snjypl

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not code that is new to this PR. It was just moved to a different location. If you see the airflow/utils/log/file_task_handler.py module, this code existed there before these changes.

namespace=namespace,
container="base",
follow=False,
tail_lines=100,
_preload_content=False,
)

for line in res:
log += line.decode()

return log

except Exception as f:
log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n"
return log, {"end_of_log": True}

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
Expand Down
7 changes: 7 additions & 0 deletions airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ def queue_task_instance(
cfg_path=cfg_path,
)

def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
"""Fetch task log from kubernetes executor"""
if ti.queue == self.kubernetes_executor.kubernetes_queue:
return self.kubernetes_executor.get_task_log(ti=ti, log=log)

return None

def has_task(self, task_instance: TaskInstance) -> bool:
"""
Checks if a task is either queued or running in either local or kubernetes executor.
Expand Down
166 changes: 64 additions & 102 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin

from airflow.configuration import AirflowConfigException, conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, RemovedInAirflow3Warning
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.log.logging_mixin import SetContextPropagate
Expand Down Expand Up @@ -146,23 +147,54 @@ def _render_filename(self, ti: TaskInstance, try_number: int) -> str:
def _read_grouped_logs(self):
return False

@staticmethod
def _should_check_k8s(queue):
"""
If the task is running through kubernetes executor, return True.
def _get_task_log_from_worker(
self, ti: TaskInstance, log: str, log_relative_path: str
) -> str | tuple[str, dict[str, bool]]:
import httpx

When logs aren't available locally, in this case we read from k8s pod logs.
"""
executor = conf.get("core", "executor")
if executor == "KubernetesExecutor":
return True
elif executor == "LocalKubernetesExecutor":
if queue == conf.get("local_kubernetes_executor", "kubernetes_queue"):
return True
elif executor == "CeleryKubernetesExecutor":
if queue == conf.get("celery_kubernetes_executor", "kubernetes_queue"):
return True
return False
from airflow.utils.jwt_signer import JWTSigner

url = self._get_log_retrieval_url(ti, log_relative_path)
log += f"*** Fetching from: {url}\n"

try:
timeout = None # No timeout
try:
timeout = conf.getint("webserver", "log_fetch_timeout_sec")
except (AirflowConfigException, ValueError):
pass

signer = JWTSigner(
secret_key=conf.get("webserver", "secret_key"),
expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30),
audience="task-instance-logs",
)
response = httpx.get(
url,
timeout=timeout,
headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
)
response.encoding = "utf-8"

if response.status_code == 403:
log += (
"*** !!!! Please make sure that all your Airflow components (e.g. "
"schedulers, webservers and workers) have "
"the same 'secret_key' configured in 'webserver' section and "
"time is synchronized on all your machines (for example with ntpd) !!!!!\n***"
)
log += (
"*** See more at https://airflow.apache.org/docs/apache-airflow/"
"stable/configurations-ref.html#secret-key\n***"
)
# Check if the resource was properly fetched
response.raise_for_status()

log += "\n" + response.text
return log
except Exception as e:
log += f"*** Failed to fetch log file from worker. {str(e)}\n"
return log, {"end_of_log": True}

def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | None = None):
"""
Expand All @@ -186,8 +218,6 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
This is determined by the status of the TaskInstance
log_pos: (absolute) Char position to which the log is retrieved
"""
from airflow.utils.jwt_signer import JWTSigner

# Task instance here might be different from task instance when
# initializing the handler. Thus explicitly getting log location
# is needed to get correct log path.
Expand All @@ -204,91 +234,23 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
log = f"*** Failed to load local log file: {location}\n"
log += f"*** {str(e)}\n"
return log, {"end_of_log": True}
elif self._should_check_k8s(ti.queue):
try:
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.pod_generator import PodGenerator

client = get_kube_client()

log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
selector = PodGenerator.build_selector_for_k8s_executor_pod(
dag_id=ti.dag_id,
task_id=ti.task_id,
try_number=ti.try_number,
map_index=ti.map_index,
run_id=ti.run_id,
airflow_worker=ti.queued_by_job_id,
)
namespace = self._get_pod_namespace(ti)
pod_list = client.list_namespaced_pod(
namespace=namespace,
label_selector=selector,
).items
if not pod_list:
raise RuntimeError("Cannot find pod for ti %s", ti)
elif len(pod_list) > 1:
raise RuntimeError("Found multiple pods for ti %s: %s", ti, pod_list)
res = client.read_namespaced_pod_log(
name=pod_list[0].metadata.name,
namespace=namespace,
container="base",
follow=False,
tail_lines=100,
_preload_content=False,
)
else:
log += f"*** Local log file does not exist: {location}\n"
executor = ExecutorLoader.get_default_executor()
task_log = None

for line in res:
log += line.decode()
task_log = executor.get_task_log(ti=ti, log=log)
if isinstance(task_log, tuple):
return task_log

except Exception as f:
log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n"
return log, {"end_of_log": True}
else:
import httpx
if task_log is None:
log += "*** Failed to fetch log from executor. Falling back to fetching log from worker.\n"
task_log = self._get_task_log_from_worker(ti, log, log_relative_path=log_relative_path)

url = self._get_log_retrieval_url(ti, log_relative_path)
log += f"*** Log file does not exist: {location}\n"
log += f"*** Fetching from: {url}\n"
try:
timeout = None # No timeout
try:
timeout = conf.getint("webserver", "log_fetch_timeout_sec")
except (AirflowConfigException, ValueError):
pass

signer = JWTSigner(
secret_key=conf.get("webserver", "secret_key"),
expiration_time_in_seconds=conf.getint(
"webserver", "log_request_clock_grace", fallback=30
),
audience="task-instance-logs",
)
response = httpx.get(
url,
timeout=timeout,
headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
)
response.encoding = "utf-8"

if response.status_code == 403:
log += (
"*** !!!! Please make sure that all your Airflow components (e.g. "
"schedulers, webservers and workers) have "
"the same 'secret_key' configured in 'webserver' section and "
"time is synchronized on all your machines (for example with ntpd) !!!!!\n***"
)
log += (
"*** See more at https://airflow.apache.org/docs/apache-airflow/"
"stable/configurations-ref.html#secret-key\n***"
)
# Check if the resource was properly fetched
response.raise_for_status()

log += "\n" + response.text
except Exception as e:
log += f"*** Failed to fetch log file from worker. {str(e)}\n"
return log, {"end_of_log": True}
if isinstance(task_log, tuple):
return task_log

log = str(task_log)

# Process tailing if log is not at it's end
end_of_log = ti.try_number != try_number or ti.state not in [State.RUNNING, State.DEFERRED]
Expand Down
8 changes: 7 additions & 1 deletion tests/executors/test_base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from airflow.executors.base_executor import BaseExecutor, RunningRetryAttemptType
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstance import TaskInstanceKey
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.utils import timezone
from airflow.utils.state import State

Expand All @@ -44,6 +44,12 @@ def test_is_local_default_value():
assert not BaseExecutor.is_local


def test_get_task_log():
executor = BaseExecutor()
ti = TaskInstance(task=BaseOperator(task_id="dummy"))
assert executor.get_task_log(ti=ti) is None


def test_serve_logs_default_value():
assert not BaseExecutor.serve_logs

Expand Down
16 changes: 16 additions & 0 deletions tests/executors/test_celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,22 @@ def mock_ti(queue):
celery_executor_mock.try_adopt_task_instances.assert_called_once_with(celery_tis)
k8s_executor_mock.try_adopt_task_instances.assert_called_once_with(k8s_tis)

def test_log_is_fetched_from_k8s_executor_only_for_k8s_queue(self):
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
simple_task_instance = mock.MagicMock()
simple_task_instance.queue = KUBERNETES_QUEUE
cke.get_task_log(ti=simple_task_instance, log="")
k8s_executor_mock.get_task_log.assert_called_once_with(ti=simple_task_instance, log=mock.ANY)

k8s_executor_mock.reset_mock()

simple_task_instance.queue = "test-queue"
log = cke.get_task_log(ti=simple_task_instance, log="")
k8s_executor_mock.get_task_log.assert_not_called()
assert log is None

def test_get_event_buffer(self):
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
Expand Down
Loading