Skip to content

RSDK-8341: log on main thread #690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

purplenicole730
Copy link
Member

@purplenicole730 purplenicole730 commented Jul 25, 2024

Jira ticket

If a user doesn't give their thread the time to log (i.e. the user is using a while loop), the logs will never show since the logs are never being awaited. This fix changes logs so that they are being printed in a separate thread, which means that logs aren't dependent on the users.

Tested using both our SDK's complex_module as well as @hexbabe 's module.

@hexbabe
Copy link
Member

hexbabe commented Jul 25, 2024

7/25/2024, 5:07:32 PM info rdk.heart-beat module.py:27 This is a log that should log repeatedly, but does not. Count: 8 log_ts UTC

7/25/2024, 5:07:32 PM info rdk.heart-beat module.py:27 This is a log that should log repeatedly, but does not. Count: 7 log_ts UTC

7/25/2024, 5:07:32 PM info rdk.heart-beat module.py:27 This is a log that should log repeatedly, but does not. Count: 6 log_ts UTC

7/25/2024, 5:07:22 PM info rdk.heart-beat module.py:27 This is a log that should log repeatedly, but does not. Count: 5 log_ts UTC

7/25/2024, 5:07:22 PM info rdk.heart-beat module.py:27 This is a log that should log repeatedly, but does not. Count: 4 log_ts UTC

7/25/2024, 5:07:22 PM info rdk.heart-beat module.py:27 This is a log that should log repeatedly, but does not. Count: 2 log_ts UTC

7/25/2024, 5:07:22 PM info rdk.heart-beat module.py:27 This is a log that should log repeatedly, but does not. Count: 3 log_ts UTC

7/25/2024, 5:07:22 PM info rdk.heart-beat module.py:27 This is a log that should log repeatedly, but does not. Count: 1 log_ts UTC

Testing with my repro: getting the then-invisible logs now with these changes!

purplenicole730 and others added 3 commits July 25, 2024 17:26
Co-authored-by: sean yu <55464069+hexbabe@users.noreply.github.com>
@purplenicole730 purplenicole730 requested a review from hexbabe July 25, 2024 21:31
@hexbabe
Copy link
Member

hexbabe commented Jul 25, 2024

This looks good to me, but I will officially approve after all requested changes resolved and I re-test it with my repro module! (since Benji + whoever else hasn't reviewed yet)

Copy link
Member

@benjirewis benjirewis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome find here! I'm a little worried about the unstable if invoked in a different thread than that of the module. What would happen if you did something like this:

 LOG_LEVEL = INFO
 LOGGERS: Dict[str, logging.Logger] = {}
 _MODULE_PARENT: Optional["RobotClient"] = None
+_MODULE_EVENT_LOOP: asyncio.AbstractEventLoop = asyncio.new_event_loop()


 class _ModuleHandler(logging.Handler):
@@ -29,13 +30,6 @@ class _ModuleHandler(logging.Handler):
         addHandlers(self._logger, True)
         super().__init__()
         self._logger.setLevel(self.level)
-        try:
-            self.loop = asyncio.get_event_loop()
-        except RuntimeError:
-            self._logger.warn("Creating an event loop from a new thread. We recommend initializing loggers in the main module thread.")
-            # If the log is coming from a thread that doesn't have an event loop, create and set a new one.
-            self.loop = asyncio.new_event_loop()
-            asyncio.set_event_loop(self.loop)

     def setLevel(self, level: Union[int, str]) -> None:
         self._logger.setLevel(level)
@@ -56,7 +50,7 @@ class _ModuleHandler(logging.Handler):

         try:
             assert self._parent is not None
-            self.loop.create_task(
+            _MODULE_EVENT_LOOP.create_task(
                 self._parent.log(name, record.levelname, time, message, stack), name=f"{viam._TASK_PREFIX}-LOG-{record.created}"
             ).add_done_callback(self.handle_task_result)
         except Exception as err:

Totally fine if you think globalizing the event loop is a bad idea; I'm far from an expert on async python 😆 . You may have to modify that code to handle if an event loop has already been created, too.

@hexbabe
Copy link
Member

hexbabe commented Jul 26, 2024

After running into some other issues already with this solution. Here is what I propose

diff --git a/src/viam/logging.py b/src/viam/logging.py
index b61d8ef..3abe1d5 100644
--- a/src/viam/logging.py
+++ b/src/viam/logging.py
@@ -3,6 +3,7 @@ import logging
 import sys
 from copy import copy
 from datetime import datetime
+from threading import Thread, Lock
 from logging import DEBUG, ERROR, FATAL, INFO, WARN, WARNING  # noqa: F401
 from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Union
 
@@ -13,28 +14,61 @@ import viam
 if TYPE_CHECKING:
     from .robot.client import RobotClient
 
-
 LOG_LEVEL = INFO
 LOGGERS: Dict[str, logging.Logger] = {}
 _MODULE_PARENT: Optional["RobotClient"] = None
 
 
-class _ModuleHandler(logging.Handler):
-    _parent: "RobotClient"
-    _logger: logging.Logger
+class SingletonEventLoopThread:
+    _instance = None
+    _lock = Lock()
+
+    def __new__(cls):
+        # Ensure singleton precondition
+        if cls._instance is None:
+            with cls._lock:
+                if cls._instance is None:
+                    cls._instance = super(SingletonEventLoopThread, cls).__new__(cls)
+                    cls._instance._loop = None
+                    cls._instance._ready_event = asyncio.Event()
+                    cls._instance._thread = Thread(target=cls._instance._run)
+                    cls._instance._thread.start()
+        return cls._instance
+
+    def _run(self):
+        self._loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(self._loop)
+        self._ready_event.set()
+        self._loop.run_forever()
+
+    def stop(self):
+        if self._loop is not None:
+            self._loop.call_soon_threadsafe(self._loop.stop)
+            self._loop.close()
+
+    def get_loop(self):
+        if self._loop is None:
+            raise RuntimeError("Event loop is None. Did you call .start() and .wait_until_ready?")
+        return self._loop
+
+    async def wait_until_ready(self):
+        await self._ready_event.wait()
 
+
+class _ModuleHandler(logging.Handler):
     def __init__(self, parent: "RobotClient"):
+        super().__init__()
         self._parent = parent
         self._logger = logging.getLogger("ModuleLogger")
+        self._worker = SingletonEventLoopThread()
         addHandlers(self._logger, True)
-        super().__init__()
         self._logger.setLevel(self.level)
 
     def setLevel(self, level: Union[int, str]) -> None:
         self._logger.setLevel(level)
         return super().setLevel(level)
 
-    def handle_task_result(self, task: asyncio.Task):
+    async def handle_task_result(self, task: asyncio.Task):
         try:
             _ = task.result()
         except (asyncio.CancelledError, asyncio.InvalidStateError, StreamTerminatedError):
@@ -49,23 +83,27 @@ class _ModuleHandler(logging.Handler):
 
         try:
             assert self._parent is not None
-            try:
-                loop = asyncio.get_event_loop()
-                loop.create_task(
-                    self._parent.log(name, record.levelname, time, message, stack), name=f"{viam._TASK_PREFIX}-LOG-{record.created}"
-                ).add_done_callback(self.handle_task_result)
-            except RuntimeError:
-                # If the log is coming from a thread that doesn't have an event loop, create and set a new one.
-                loop = asyncio.new_event_loop()
-                asyncio.set_event_loop(loop)
-                loop.create_task(
-                    self._parent.log(name, record.levelname, time, message, stack), name=f"{viam._TASK_PREFIX}-LOG-{record.created}"
-                ).add_done_callback(self.handle_task_result)
+            loop = self._worker.get_loop()
+            asyncio.run_coroutine_threadsafe(
+                self._asynchronously_emit(record, name, message, stack, time),
+                loop,
+            )
         except Exception as err:
-            # If the module log fails, log using stdout/stderr handlers
             self._logger.error(f"ModuleLogger failed for {record.name} - {err}")
             self._logger.log(record.levelno, message)
 
+    async def _asynchronously_emit(self, record: logging.LogRecord, name: str, message: str, stack: str, time: datetime):
+        await self._worker.wait_until_ready()
+        task = self._worker.get_loop().create_task(
+            self._parent.log(name, record.levelname, time, message, stack),
+            name=f"{viam._TASK_PREFIX}-LOG-{record.created}",
+        )
+        task.add_done_callback(lambda t: asyncio.run_coroutine_threadsafe(self.handle_task_result(t), self._worker.get_loop()))
+
+    def close(self):
+        self._worker.stop()
+        super().close()
+
 
 class _ColorFormatter(logging.Formatter):
     MAPPING = {
@@ -76,8 +114,8 @@ class _ColorFormatter(logging.Formatter):
         "CRITICAL": 41,  # white on red bg
     }
 
-    def __init__(self, patern):
-        logging.Formatter.__init__(self, patern)
+    def __init__(self, pattern):
+        logging.Formatter.__init__(self, pattern)
 
     def format(self, record):
         colored_record = copy(record)

Basically use a singleton task runner apart from the main thread and child threads so we can be assured our async logic don't get blocked by any user action.

@hexbabe
Copy link
Member

hexbabe commented Jul 26, 2024

running into some other issues already with this solution

def _wait_until_worker_running(self, max_attempts=5, timeout_seconds=1):
        """
        Blocks on camera data methods that require the worker to be running.
        Unblocks once worker is running or max number of attempts to pass is reached.

        Args:
            max_attempts (int, optional): Defaults to 5.
            timeout_seconds (int, optional): Defaults to 1.

        Raises:

        """
        cls: Oak = type(self)
        attempts = 0
        while attempts < max_attempts:
            if cls.worker.running:
                return
            time.sleep(timeout_seconds)
            attempts += 1
        raise ViamError(
            "Camera data requested before camera worker was ready. Please ensure the camera is properly "
            "connected and configured, especially for non-integrated models such as the OAK-FFC."
        )

I await this function call in get_image and other APIs that wait for the camera to be ready. Because time.sleep blocks in the main thread, logs are suppressed. I would rather not have to write asynchronous code in these methods. IMO a dedicated singleton thread to handle log RPC calls makes sense to me, as long as we manage it its lifecycle properly.

@@ -19,22 +20,67 @@
_MODULE_PARENT: Optional["RobotClient"] = None


class SingletonEventLoopThread:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this private

self._logger.setLevel(self.level)
self._worker = SingletonEventLoopThread()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define _worker above as a property

self._logger.setLevel(self.level)
self._worker = SingletonEventLoopThread()
try:
self.loop = asyncio.get_event_loop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define loop as well above as a property. And consider making it private

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why do we need to store a loop? Seems like we're getting the loop from the worker now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

artifact of old code! Thanks for catching


def get_loop(self):
if self._loop is None:
raise RuntimeError("Event loop is None. Did you call .start() and .wait_until_ready?")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add parens to the .wait_until_ready function since you have them on .start()

@purplenicole730 purplenicole730 marked this pull request as ready for review July 29, 2024 18:08
@purplenicole730 purplenicole730 requested a review from a team as a code owner July 29, 2024 18:08
Copy link
Member

@hexbabe hexbabe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Tested on my local module for the camera I encountered these problems with:
https://www.loom.com/share/b9f4b6dfb34846f4a4fe076d416d35b5?sid=c4e93eae-7026-4e60-8016-719ebacb873d

The logs that were suppressed before are logging in this video

@purplenicole730 purplenicole730 requested review from lia-viam and removed request for lia-viam July 30, 2024 13:51
Copy link
Member

@benjirewis benjirewis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me as long as the simple and complex python example modules can log correctly 🫡 !

@purplenicole730 purplenicole730 merged commit c9049ed into viamrobotics:main Aug 2, 2024
13 checks passed
@purplenicole730 purplenicole730 deleted the RSDK-8341-log-on-main-thread branch August 2, 2024 19:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants