Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: struct
return event
# Only init the handler stream_name once. We cannot do it above when we init the handler because
# we don't yet know the log path at that point.
if not _handler.log_stream_name:
_handler.log_stream_name = stream_name.as_posix().replace(":", "_")
# we should always use the path(log-stream-name) coming from the logger.
_handler.log_stream_name = stream_name.as_posix().replace(":", "_")
name = event.get("logger_name") or event.get("logger", "")
level = structlog.stdlib.NAME_TO_LEVEL.get(method_name.lower(), logging.INFO)
msg = copy.copy(event)
Expand All @@ -149,7 +149,14 @@ def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: struct
return (proc,)

def close(self):
self.handler.close()
# Use the flush method to ensure all logs are sent to CloudWatch.
# Closing the handler sets `shutting_down` to True, which prevents any further logs from being sent.
# When `shutting_down` is True, means the logging system is in the process of shutting down,
# during which it attempts to flush the logs which are queued.
if self.handler is None or self.handler.shutting_down:
return

self.handler.flush()

def upload(self, path: os.PathLike | str, ti: RuntimeTI):
# No-op, as we upload via the processor as we go
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ def test_log_message(self):
# We need to close in order to flush the logs etc.
self.subject.close()

# close call should only flush the logs and not set shutting_down to True
assert self.subject.handler.shutting_down is False

# Inside the Cloudwatch logger we swap colons for underscores since colons are not allowed in
# stream names.
stream_name = self.task_log_path.replace(":", "_")
Expand Down