-
Notifications
You must be signed in to change notification settings - Fork 56
RSDK-8341: log on main thread #690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RSDK-8341: log on main thread #690
Conversation
Testing with my repro: getting the then-invisible logs now with these changes! |
Co-authored-by: sean yu <55464069+hexbabe@users.noreply.github.com>
This looks good to me, but I will officially approve after all requested changes resolved and I re-test it with my repro module! (since Benji + whoever else hasn't reviewed yet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome find here! I'm a little worried about the unstable if invoked in a different thread than that of the module
. What would happen if you did something like this:
LOG_LEVEL = INFO
LOGGERS: Dict[str, logging.Logger] = {}
_MODULE_PARENT: Optional["RobotClient"] = None
+_MODULE_EVENT_LOOP: asyncio.AbstractEventLoop = asyncio.new_event_loop()
class _ModuleHandler(logging.Handler):
@@ -29,13 +30,6 @@ class _ModuleHandler(logging.Handler):
addHandlers(self._logger, True)
super().__init__()
self._logger.setLevel(self.level)
- try:
- self.loop = asyncio.get_event_loop()
- except RuntimeError:
- self._logger.warn("Creating an event loop from a new thread. We recommend initializing loggers in the main module thread.")
- # If the log is coming from a thread that doesn't have an event loop, create and set a new one.
- self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(self.loop)
def setLevel(self, level: Union[int, str]) -> None:
self._logger.setLevel(level)
@@ -56,7 +50,7 @@ class _ModuleHandler(logging.Handler):
try:
assert self._parent is not None
- self.loop.create_task(
+ _MODULE_EVENT_LOOP.create_task(
self._parent.log(name, record.levelname, time, message, stack), name=f"{viam._TASK_PREFIX}-LOG-{record.created}"
).add_done_callback(self.handle_task_result)
except Exception as err:
Totally fine if you think globalizing the event loop is a bad idea; I'm far from an expert on async python 😆 . You may have to modify that code to handle if an event loop has already been created, too.
After running into some other issues already with this solution. Here is what I propose diff --git a/src/viam/logging.py b/src/viam/logging.py
index b61d8ef..3abe1d5 100644
--- a/src/viam/logging.py
+++ b/src/viam/logging.py
@@ -3,6 +3,7 @@ import logging
import sys
from copy import copy
from datetime import datetime
+from threading import Thread, Lock
from logging import DEBUG, ERROR, FATAL, INFO, WARN, WARNING # noqa: F401
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Union
@@ -13,28 +14,61 @@ import viam
if TYPE_CHECKING:
from .robot.client import RobotClient
-
LOG_LEVEL = INFO
LOGGERS: Dict[str, logging.Logger] = {}
_MODULE_PARENT: Optional["RobotClient"] = None
-class _ModuleHandler(logging.Handler):
- _parent: "RobotClient"
- _logger: logging.Logger
+class SingletonEventLoopThread:
+ _instance = None
+ _lock = Lock()
+
+ def __new__(cls):
+ # Ensure singleton precondition
+ if cls._instance is None:
+ with cls._lock:
+ if cls._instance is None:
+ cls._instance = super(SingletonEventLoopThread, cls).__new__(cls)
+ cls._instance._loop = None
+ cls._instance._ready_event = asyncio.Event()
+ cls._instance._thread = Thread(target=cls._instance._run)
+ cls._instance._thread.start()
+ return cls._instance
+
+ def _run(self):
+ self._loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(self._loop)
+ self._ready_event.set()
+ self._loop.run_forever()
+
+ def stop(self):
+ if self._loop is not None:
+ self._loop.call_soon_threadsafe(self._loop.stop)
+ self._loop.close()
+
+ def get_loop(self):
+ if self._loop is None:
+ raise RuntimeError("Event loop is None. Did you call .start() and .wait_until_ready?")
+ return self._loop
+
+ async def wait_until_ready(self):
+ await self._ready_event.wait()
+
+class _ModuleHandler(logging.Handler):
def __init__(self, parent: "RobotClient"):
+ super().__init__()
self._parent = parent
self._logger = logging.getLogger("ModuleLogger")
+ self._worker = SingletonEventLoopThread()
addHandlers(self._logger, True)
- super().__init__()
self._logger.setLevel(self.level)
def setLevel(self, level: Union[int, str]) -> None:
self._logger.setLevel(level)
return super().setLevel(level)
- def handle_task_result(self, task: asyncio.Task):
+ async def handle_task_result(self, task: asyncio.Task):
try:
_ = task.result()
except (asyncio.CancelledError, asyncio.InvalidStateError, StreamTerminatedError):
@@ -49,23 +83,27 @@ class _ModuleHandler(logging.Handler):
try:
assert self._parent is not None
- try:
- loop = asyncio.get_event_loop()
- loop.create_task(
- self._parent.log(name, record.levelname, time, message, stack), name=f"{viam._TASK_PREFIX}-LOG-{record.created}"
- ).add_done_callback(self.handle_task_result)
- except RuntimeError:
- # If the log is coming from a thread that doesn't have an event loop, create and set a new one.
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop.create_task(
- self._parent.log(name, record.levelname, time, message, stack), name=f"{viam._TASK_PREFIX}-LOG-{record.created}"
- ).add_done_callback(self.handle_task_result)
+ loop = self._worker.get_loop()
+ asyncio.run_coroutine_threadsafe(
+ self._asynchronously_emit(record, name, message, stack, time),
+ loop,
+ )
except Exception as err:
- # If the module log fails, log using stdout/stderr handlers
self._logger.error(f"ModuleLogger failed for {record.name} - {err}")
self._logger.log(record.levelno, message)
+ async def _asynchronously_emit(self, record: logging.LogRecord, name: str, message: str, stack: str, time: datetime):
+ await self._worker.wait_until_ready()
+ task = self._worker.get_loop().create_task(
+ self._parent.log(name, record.levelname, time, message, stack),
+ name=f"{viam._TASK_PREFIX}-LOG-{record.created}",
+ )
+ task.add_done_callback(lambda t: asyncio.run_coroutine_threadsafe(self.handle_task_result(t), self._worker.get_loop()))
+
+ def close(self):
+ self._worker.stop()
+ super().close()
+
class _ColorFormatter(logging.Formatter):
MAPPING = {
@@ -76,8 +114,8 @@ class _ColorFormatter(logging.Formatter):
"CRITICAL": 41, # white on red bg
}
- def __init__(self, patern):
- logging.Formatter.__init__(self, patern)
+ def __init__(self, pattern):
+ logging.Formatter.__init__(self, pattern)
def format(self, record):
colored_record = copy(record) Basically use a singleton task runner apart from the main thread and child threads so we can be assured our async logic don't get blocked by any user action. |
I |
src/viam/logging.py
Outdated
@@ -19,22 +20,67 @@ | |||
_MODULE_PARENT: Optional["RobotClient"] = None | |||
|
|||
|
|||
class SingletonEventLoopThread: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this private
src/viam/logging.py
Outdated
self._logger.setLevel(self.level) | ||
self._worker = SingletonEventLoopThread() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Define _worker
above as a property
src/viam/logging.py
Outdated
self._logger.setLevel(self.level) | ||
self._worker = SingletonEventLoopThread() | ||
try: | ||
self.loop = asyncio.get_event_loop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Define loop
as well above as a property. And consider making it private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, why do we need to store a loop? Seems like we're getting the loop from the worker now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
artifact of old code! Thanks for catching
src/viam/logging.py
Outdated
|
||
def get_loop(self): | ||
if self._loop is None: | ||
raise RuntimeError("Event loop is None. Did you call .start() and .wait_until_ready?") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add parens to the .wait_until_ready
function since you have them on .start()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Tested on my local module for the camera I encountered these problems with:
https://www.loom.com/share/b9f4b6dfb34846f4a4fe076d416d35b5?sid=c4e93eae-7026-4e60-8016-719ebacb873d
The logs that were suppressed before are logging in this video
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me as long as the simple and complex python example modules can log correctly 🫡 !
Jira ticket
If a user doesn't give their thread the time to log (i.e. the user is using a while loop), the logs will never show since the logs are never being awaited. This fix changes logs so that they are being printed in a separate thread, which means that logs aren't dependent on the users.
Tested using both our SDK's
complex_module
as well as @hexbabe 's module.