Skip to content

Commit

Permalink
Merge pull request #1845 from PrefectHQ/resilient-logs
Browse files Browse the repository at this point in the history
Make logging to Cloud more robust in the face of a bad log
  • Loading branch information
cicdw authored Dec 13, 2019
2 parents 2db56c2 + c90cc18 commit 5105b6b
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

- Fix Kubernetes Agent passing empty default namespace - [#1839](https://github.com/PrefectHQ/prefect/pull/1839)
- Fix missing Flow Run name on Dask Kubernetes Environment default worker pod - [#1839](https://github.com/PrefectHQ/prefect/pull/1839)
- Fix issue with a single bad log preventing all logs from being sent to Cloud - [#1845](https://github.com/PrefectHQ/prefect/pull/1845)

### Deprecations

Expand Down
7 changes: 6 additions & 1 deletion src/prefect/utilities/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
When running locally, log levels and message formatting are set via your Prefect configuration file.
"""
import atexit
import json
import logging
import sys
import threading
Expand Down Expand Up @@ -87,7 +88,11 @@ def start(self) -> None:
atexit.register(self.flush)

def put(self, log: dict) -> None:
self.queue.put(log)
try:
json.dumps(log) # make sure the payload is serializable
self.queue.put(log)
except TypeError as exc:
self.logger.critical("Failed to write log with error: {}".format(str(exc)))

def emit(self, record) -> None: # type: ignore
# if we shouldn't log to cloud, don't emit
Expand Down
3 changes: 2 additions & 1 deletion tests/engine/cloud/test_cloud_flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,8 @@ def set_task_run_state(self, *args, **kwargs):

def get_flow_run_info(self, *args, **kwargs):
return MagicMock(
task_runs=[MagicMock(task_slug=log_stuff.slug, id="TESTME")]
id="flowRunID",
task_runs=[MagicMock(task_slug=log_stuff.slug, id="TESTME")],
)

monkeypatch.setattr("prefect.client.Client", Client)
Expand Down
30 changes: 30 additions & 0 deletions tests/utilities/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,36 @@ def write_run_logs(self, *args, **kwargs):
logger.handlers = []


def test_cloud_handler_removes_bad_logs_from_queue_and_continues(caplog, monkeypatch):
calls = []

class Client:
def write_run_logs(self, *args, **kwargs):
calls.append(dict(args=args, kwargs=kwargs))

monkeypatch.setattr("prefect.client.Client", Client)
try:
logger = utilities.logging.configure_logging(testing=True)
cloud_handler = logger.handlers[-1]
assert isinstance(cloud_handler, utilities.logging.CloudHandler)

with utilities.configuration.set_temporary_config(
{"logging.log_to_cloud": True}
):
logger.critical("one")
logger.critical(b"two")
logger.critical("three")

time.sleep(0.75)
assert len(calls) == 1
msgs = [c["message"] for c in calls[0]["args"][0]]
assert msgs == ["one", "three"]
finally:
# reset root_logger
logger = utilities.logging.configure_logging(testing=True)
logger.handlers = []


def test_get_logger_returns_root_logger():
assert utilities.logging.get_logger() is logging.getLogger("prefect")

Expand Down

0 comments on commit 5105b6b

Please sign in to comment.