Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class KubernetesPodOperator(BaseOperator):
BASE_CONTAINER_NAME = "base"

POD_CHECKED_KEY = "already_checked"
POST_TERMINATION_TIMEOUT = 120

template_fields: Sequence[str] = (
"image",
Expand Down Expand Up @@ -533,6 +534,7 @@ def execute_sync(self, context: Context):
pod=self.pod,
container_name=self.base_container_name,
follow=True,
post_termination_timeout=self.POST_TERMINATION_TIMEOUT,
)
else:
self.pod_manager.await_container_completion(
Expand Down
140 changes: 129 additions & 11 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,26 @@
import warnings
from contextlib import closing, suppress
from dataclasses import dataclass
from datetime import datetime
from typing import TYPE_CHECKING, Iterable, cast
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Generator, cast

import pendulum
import tenacity
from kubernetes import client, watch
from kubernetes.client.models.v1_container_status import V1ContainerStatus
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
from pendulum import DateTime
from pendulum.parsing.exceptions import ParserError
from urllib3.exceptions import HTTPError as BaseHTTPError
from urllib3.response import HTTPResponse

from airflow.exceptions import AirflowException
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.pod_generator import PodDefaults
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.timezone import utcnow

if TYPE_CHECKING:
from kubernetes.client.models.core_v1_event_list import CoreV1EventList
Expand Down Expand Up @@ -70,15 +73,24 @@ class PodPhase:
terminal_states = {FAILED, SUCCEEDED}


def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None:
"""Retrieves container status"""
container_statuses = pod.status.container_statuses if pod and pod.status else None
if container_statuses:
# In general the variable container_statuses can store multiple items matching different containers.
# The following generator expression yields all items that have name equal to the container_name.
# The function next() here calls the generator to get only the first value. If there's nothing found
# then None is returned.
return next((x for x in container_statuses if x.name == container_name), None)
return None


def container_is_running(pod: V1Pod, container_name: str) -> bool:
"""
Examines V1Pod ``pod`` to determine whether ``container_name`` is running.
If that container is present and running, returns True. Returns False otherwise.
"""
container_statuses = pod.status.container_statuses if pod and pod.status else None
if not container_statuses:
return False
container_status = next((x for x in container_statuses if x.name == container_name), None)
container_status = get_container_status(pod, container_name)
if not container_status:
return False
return container_status.state.running is not None
Expand All @@ -91,6 +103,92 @@ def get_container_termination_message(pod: V1Pod, container_name: str):
return container_status.state.terminated.message if container_status else None


class PodLogsConsumer:
"""
PodLogsConsumer is responsible for pulling pod logs from a stream with checking a container status before
reading data.
This class is a workaround for the issue https://github.com/apache/airflow/issues/23497

:param response: HTTP response with logs
:param pod: Pod instance from Kubernetes client
:param pod_manager: Pod manager instance
:param container_name: Name of the container that we're reading logs from
:param post_termination_timeout: (Optional) The period of time in seconds representing for how long time
logs are available after the container termination.
:param read_pod_cache_timeout: (Optional) The container's status cache lifetime.
The container status is cached to reduce API calls.

:meta private:
"""

def __init__(
self,
response: HTTPResponse,
pod: V1Pod,
pod_manager: PodManager,
container_name: str,
post_termination_timeout: int = 120,
read_pod_cache_timeout: int = 120,
):
self.response = response
self.pod = pod
self.pod_manager = pod_manager
self.container_name = container_name
self.post_termination_timeout = post_termination_timeout
self.last_read_pod_at = None
self.read_pod_cache = None
self.read_pod_cache_timeout = read_pod_cache_timeout

def __iter__(self) -> Generator[bytes, None, None]:
r"""The generator yields log items divided by the '\n' symbol."""
incomplete_log_item: list[bytes] = []
if self.logs_available():
for data_chunk in self.response.stream(amt=None, decode_content=True):
if b"\n" in data_chunk:
log_items = data_chunk.split(b"\n")
yield from self._extract_log_items(incomplete_log_item, log_items)
incomplete_log_item = self._save_incomplete_log_item(log_items[-1])
else:
incomplete_log_item.append(data_chunk)
if not self.logs_available():
break
if incomplete_log_item:
yield b"".join(incomplete_log_item)

@staticmethod
def _extract_log_items(incomplete_log_item: list[bytes], log_items: list[bytes]):
yield b"".join(incomplete_log_item) + log_items[0] + b"\n"
for x in log_items[1:-1]:
yield x + b"\n"

@staticmethod
def _save_incomplete_log_item(sub_chunk: bytes):
return [sub_chunk] if [sub_chunk] else []

def logs_available(self):
remote_pod = self.read_pod()
if container_is_running(pod=remote_pod, container_name=self.container_name):
return True
container_status = get_container_status(pod=remote_pod, container_name=self.container_name)
state = container_status.state if container_status else None
terminated = state.terminated if state else None
if terminated:
termination_time = terminated.finished_at
if termination_time:
return termination_time + timedelta(seconds=self.post_termination_timeout) > utcnow()
return False

def read_pod(self):
_now = utcnow()
if (
self.read_pod_cache is None
or self.last_read_pod_at + timedelta(seconds=self.read_pod_cache_timeout) < _now
):
self.read_pod_cache = self.pod_manager.read_pod(self.pod)
self.last_read_pod_at = _now
return self.read_pod_cache


@dataclass
class PodLoggingStatus:
"""Used for returning the status of the pod and last log time when exiting from `fetch_container_logs`"""
Expand Down Expand Up @@ -203,14 +301,22 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt
return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True)

def fetch_container_logs(
self, pod: V1Pod, container_name: str, *, follow=False, since_time: DateTime | None = None
self,
pod: V1Pod,
container_name: str,
*,
follow=False,
since_time: DateTime | None = None,
post_termination_timeout: int = 120,
) -> PodLoggingStatus:
"""
Follows the logs of container and streams to airflow logging.
Returns when container exits.
"""

def consume_logs(*, since_time: DateTime | None = None, follow: bool = True) -> DateTime | None:
def consume_logs(
*, since_time: DateTime | None = None, follow: bool = True, termination_timeout: int = 120
) -> DateTime | None:
"""
Tries to follow container logs until container completes.
For a long-running container, sometimes the log read may be interrupted
Expand All @@ -228,6 +334,7 @@ def consume_logs(*, since_time: DateTime | None = None, follow: bool = True) ->
math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None
),
follow=follow,
post_termination_timeout=termination_timeout,
)
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace")
Expand All @@ -251,7 +358,9 @@ def consume_logs(*, since_time: DateTime | None = None, follow: bool = True) ->
# So the looping logic is there to let us resume following the logs.
last_log_time = since_time
while True:
last_log_time = consume_logs(since_time=last_log_time, follow=follow)
last_log_time = consume_logs(
since_time=last_log_time, follow=follow, termination_timeout=post_termination_timeout
)
if not self.container_is_running(pod, container_name=container_name):
return PodLoggingStatus(running=False, last_log_time=last_log_time)
if not follow:
Expand Down Expand Up @@ -327,7 +436,8 @@ def read_pod_logs(
timestamps: bool = False,
since_seconds: int | None = None,
follow=True,
) -> Iterable[bytes]:
post_termination_timeout: int = 120,
) -> PodLogsConsumer:
"""Reads log from the POD"""
additional_kwargs = {}
if since_seconds:
Expand All @@ -337,7 +447,7 @@ def read_pod_logs(
additional_kwargs["tail_lines"] = tail_lines

try:
return self._client.read_namespaced_pod_log(
logs = self._client.read_namespaced_pod_log(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
container=container_name,
Expand All @@ -350,6 +460,14 @@ def read_pod_logs(
self.log.exception("There was an error reading the kubernetes API.")
raise

return PodLogsConsumer(
response=logs,
pod=pod,
pod_manager=self,
container_name=container_name,
post_termination_timeout=post_termination_timeout,
)

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
"""Reads events from the POD"""
Expand Down
Loading