Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,19 @@
)
remote_task_handler_kwargs = {}
elif remote_base_log_folder.startswith("hdfs://"):
HDFS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class": "airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler",
"formatter": "airflow",
"base_log_folder": BASE_LOG_FOLDER,
"hdfs_log_folder": remote_base_log_folder,
},
}
DEFAULT_LOGGING_CONFIG["handlers"].update(HDFS_REMOTE_HANDLERS)
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsRemoteLogIO

REMOTE_TASK_LOG = HdfsRemoteLogIO(
**(
{
"base_log_folder": BASE_LOG_FOLDER,
"remote_base": remote_base_log_folder,
"delete_local_copy": delete_local_copy,
}
| remote_task_handler_kwargs
)
)
remote_task_handler_kwargs = {}
elif ELASTICSEARCH_HOST:
ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
Expand Down
19 changes: 19 additions & 0 deletions airflow-core/tests/unit/core/test_logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,3 +316,22 @@ def test_loading_remote_logging_with_kwargs(self):
task_log = airflow.logging_config.REMOTE_TASK_LOG
assert isinstance(task_log, S3RemoteLogIO)
assert getattr(task_log, "delete_local_copy") is True

def test_loading_remote_logging_with_hdfs_handler(self):
"""Test if logging can be configured successfully for HDFS"""
pytest.importorskip("airflow.providers.apache.hdfs", reason="'apache.hdfs' provider not installed")
import airflow.logging_config
from airflow.config_templates import airflow_local_settings
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsRemoteLogIO

with conf_vars(
{
("logging", "remote_logging"): "True",
("logging", "remote_log_conn_id"): "some_hdfs",
("logging", "remote_base_log_folder"): "hdfs://some-folder",
}
):
importlib.reload(airflow_local_settings)
airflow.logging_config.configure_logging()

assert isinstance(airflow.logging_config.REMOTE_TASK_LOG, HdfsRemoteLogIO)
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,102 @@
# under the License.
from __future__ import annotations

import logging
import os
import pathlib
import shutil
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urlsplit

import attrs

from airflow.configuration import conf
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo


@attrs.define(kw_only=True)
class HdfsRemoteLogIO(LoggingMixin): # noqa: D101
remote_base: str
base_log_folder: Path = attrs.field(converter=Path)
delete_local_copy: bool

processors = ()

def upload(self, path: os.PathLike | str, ti: RuntimeTI):
"""Upload the given log path to the remote storage."""
path = Path(path)
if path.is_absolute():
local_loc = path
remote_loc = os.path.join(self.remote_base, path.relative_to(self.base_log_folder))
else:
local_loc = self.base_log_folder.joinpath(path)
remote_loc = os.path.join(self.remote_base, path)

if local_loc.is_file():
self.hook.load_file(local_loc, remote_loc)
if self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))

@cached_property
def hook(self):
"""Returns WebHDFSHook."""
return WebHDFSHook(webhdfs_conn_id=conf.get("logging", "REMOTE_LOG_CONN_ID"))

def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages]:
logs = []
messages = []
file_path = os.path.join(self.remote_base, relative_path)
if self.hook.check_for_path(file_path):
logs.append(self.hook.read_file(file_path).decode("utf-8"))
else:
messages.append(f"No logs found on hdfs for ti={ti}")
return messages, logs


class HdfsTaskHandler(FileTaskHandler, LoggingMixin):
"""Logging handler to upload and read from HDFS."""
"""
HdfsTaskHandler is a Python logging handler that handles and reads task instance logs.

It extends airflow FileTaskHandler and uploads to and reads from HDFS.
"""

def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs):
super().__init__(base_log_folder)
self.handler: logging.FileHandler | None = None
self.remote_base = urlsplit(hdfs_log_folder).path
self.log_relative_path = ""
self._hook = None
self.closed = False
self.upload_on_close = True
self.delete_local_copy = kwargs.get(
"delete_local_copy", conf.getboolean("logging", "delete_local_logs")
)

@cached_property
def hook(self):
"""Returns WebHDFSHook."""
return WebHDFSHook(webhdfs_conn_id=conf.get("logging", "REMOTE_LOG_CONN_ID"))
self.io = HdfsRemoteLogIO(
remote_base=hdfs_log_folder,
base_log_folder=base_log_folder,
delete_local_copy=kwargs.get(
"delete_local_copy", conf.getboolean("logging", "delete_local_logs")
),
)

def set_context(self, ti):
def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None:
super().set_context(ti)
# Local location and remote location is needed to open and
# upload local log file to HDFS storage.
if TYPE_CHECKING:
assert self.handler is not None

full_path = self.handler.baseFilename
self.log_relative_path = pathlib.Path(full_path).relative_to(self.local_base).as_posix()
self.log_relative_path = Path(full_path).relative_to(self.local_base).as_posix()
is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
self.upload_on_close = is_trigger_log_context or not ti.raw
self.ti = ti
# Clear the file first so that duplicate data is not uploaded
# when reusing the same path (e.g. with rescheduled sensors)
if self.upload_on_close:
Expand All @@ -76,28 +133,16 @@ def close(self):
if not self.upload_on_close:
return

local_loc = os.path.join(self.local_base, self.log_relative_path)
remote_loc = os.path.join(self.remote_base, self.log_relative_path)
if os.path.exists(local_loc) and os.path.isfile(local_loc):
self.hook.load_file(local_loc, remote_loc)
if self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))
if hasattr(self, "ti"):
self.io.upload(self.log_relative_path, self.ti)

# Mark closed so we don't double write if close is called twice
self.closed = True

def _read_remote_logs(self, ti, try_number, metadata=None):
def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[LogSourceInfo, LogMessages]:
# Explicitly getting log relative path is necessary as the given
# task instance might be different from task instance passed
# in set_context method.
worker_log_rel_path = self._render_filename(ti, try_number)

logs = []
messages = []
file_path = os.path.join(self.remote_base, worker_log_rel_path)
if self.hook.check_for_path(file_path):
logs.append(self.hook.read_file(file_path).decode("utf-8"))
else:
messages.append(f"No logs found on hdfs for ti={ti}")

messages, logs = self.io.read(worker_log_rel_path, ti)
return messages, logs
16 changes: 16 additions & 0 deletions providers/apache/hdfs/tests/unit/apache/hdfs/log/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading