Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions airflow/providers/google/cloud/log/gcs_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import shutil
from functools import cached_property
from pathlib import Path
from typing import Collection
from typing import TYPE_CHECKING, Collection

# not sure why but mypy complains on missing `storage` but it is clearly there and is importable
from google.cloud import storage # type: ignore[attr-defined]
Expand All @@ -36,6 +36,9 @@
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance

_DEFAULT_SCOPESS = frozenset(
[
"https://www.googleapis.com/auth/devstorage.read_write",
Expand Down Expand Up @@ -96,6 +99,7 @@ def __init__(
**kwargs,
):
super().__init__(base_log_folder, filename_template)
self.handler: logging.FileHandler | None = None
self.remote_base = gcs_log_folder
self.log_relative_path = ""
self.closed = False
Expand Down Expand Up @@ -137,15 +141,21 @@ def client(self) -> storage.Client:
project=self.project_id if self.project_id else project_id,
)

def set_context(self, ti):
super().set_context(ti)
def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None:
if getattr(self, "supports_task_context_logging", False):
super().set_context(ti, identifier=identifier)
else:
super().set_context(ti)
# Log relative path is used to construct local and remote
# log path to upload log files into GCS and read from the
# remote location.
if TYPE_CHECKING:
assert self.handler is not None

full_path = self.handler.baseFilename
self.log_relative_path = Path(full_path).relative_to(self.local_base).as_posix()
is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
self.upload_on_close = is_trigger_log_context or not ti.raw
self.upload_on_close = is_trigger_log_context or not getattr(ti, "raw", None)

def close(self):
"""Close and upload local log file to remote storage GCS."""
Expand Down