-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
AIP-51AIP-51: Remove executor coupling from CoreAIP-51: Remove executor coupling from Core
Description
Overview
The FileTaskHandler logging class has a special branch for Kubernetes, to fetch logs from the pod (Note: this is currently hardcoded to KubernetesExecutor so this would fail to run for the CeleryKubernetesExecutor or LocalKubernetesExecutor).
Examples
- 4a) See code:
airflow/airflow/utils/log/file_task_handler.py
Lines 157 to 192 in 27e2101
elif conf.get('core', 'executor') == 'KubernetesExecutor': try: from airflow.kubernetes.kube_client import get_kube_client kube_client = get_kube_client() if len(ti.hostname) >= 63: # Kubernetes takes the pod name and truncates it for the hostname. This truncated hostname # is returned for the fqdn to comply with the 63 character limit imposed by DNS standards # on any label of a FQDN. pod_list = kube_client.list_namespaced_pod(conf.get('kubernetes', 'namespace')) matches = [ pod.metadata.name for pod in pod_list.items if pod.metadata.name.startswith(ti.hostname) ] if len(matches) == 1: if len(matches[0]) > len(ti.hostname): ti.hostname = matches[0] log += f'*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n' res = kube_client.read_namespaced_pod_log( name=ti.hostname, namespace=conf.get('kubernetes', 'namespace'), container='base', follow=False, tail_lines=100, _preload_content=False, ) for line in res: log += line.decode() except Exception as f: log += f'*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n'
Proposal
Adding a get_task_logs(ti: TaskInstance | None) method on the BaseExecutor class, which Executor subclasses could implement is one viable approach, since other Executors could certainly make use of such a paradigm, particularly containerized or cloud based executors.
Metadata
Metadata
Assignees
Labels
AIP-51AIP-51: Remove executor coupling from CoreAIP-51: Remove executor coupling from Core