4
4
from copy import copy
5
5
from datetime import datetime
6
6
from logging import DEBUG , ERROR , FATAL , INFO , WARN , WARNING # noqa: F401
7
+ from threading import Lock , Thread
7
8
from typing import TYPE_CHECKING , Dict , Iterable , List , Optional , Union
8
9
9
10
from grpclib .exceptions import StreamTerminatedError
19
20
_MODULE_PARENT : Optional ["RobotClient" ] = None
20
21
21
22
23
+ class _SingletonEventLoopThread :
24
+ _instance = None
25
+ _lock = Lock ()
26
+ _ready_event = asyncio .Event ()
27
+ _thread = None
28
+
29
+ def __new__ (cls ):
30
+ # Ensure singleton precondition
31
+ if cls ._instance is None :
32
+ with cls ._lock :
33
+ if cls ._instance is None :
34
+ cls ._instance = super (_SingletonEventLoopThread , cls ).__new__ (cls )
35
+ cls ._instance ._loop = None
36
+ cls ._instance ._thread = Thread (target = cls ._instance ._run )
37
+ cls ._instance ._thread .start ()
38
+ return cls ._instance
39
+
40
+ def _run (self ):
41
+ self ._loop = asyncio .new_event_loop ()
42
+ asyncio .set_event_loop (self ._loop )
43
+ self ._ready_event .set ()
44
+ self ._loop .run_forever ()
45
+
46
+ def stop (self ):
47
+ if self ._loop is not None :
48
+ self ._loop .call_soon_threadsafe (self ._loop .stop )
49
+ self ._loop .close ()
50
+
51
+ def get_loop (self ):
52
+ if self ._loop is None :
53
+ raise RuntimeError ("Event loop is None. Did you call .start() and .wait_until_ready()?" )
54
+ return self ._loop
55
+
56
+ async def wait_until_ready (self ):
57
+ await self ._ready_event .wait ()
58
+
59
+
22
60
class _ModuleHandler (logging .Handler ):
23
61
_parent : "RobotClient"
24
62
_logger : logging .Logger
63
+ _worker : _SingletonEventLoopThread
25
64
26
65
def __init__ (self , parent : "RobotClient" ):
66
+ super ().__init__ ()
27
67
self ._parent = parent
28
68
self ._logger = logging .getLogger ("ModuleLogger" )
29
69
addHandlers (self ._logger , True )
30
- super ().__init__ ()
31
70
self ._logger .setLevel (self .level )
71
+ self ._worker = _SingletonEventLoopThread ()
32
72
33
73
def setLevel (self , level : Union [int , str ]) -> None :
34
74
self ._logger .setLevel (level )
35
75
return super ().setLevel (level )
36
76
37
- def handle_task_result (self , task : asyncio .Task ):
77
+ async def handle_task_result (self , task : asyncio .Task ):
38
78
try :
39
79
_ = task .result ()
40
80
except (asyncio .CancelledError , asyncio .InvalidStateError , StreamTerminatedError ):
@@ -48,24 +88,28 @@ def emit(self, record: logging.LogRecord):
48
88
time = datetime .fromtimestamp (record .created )
49
89
50
90
try :
51
- assert self ._parent is not None
52
- try :
53
- loop = asyncio .get_event_loop ()
54
- loop .create_task (
55
- self ._parent .log (name , record .levelname , time , message , stack ), name = f"{ viam ._TASK_PREFIX } -LOG-{ record .created } "
56
- ).add_done_callback (self .handle_task_result )
57
- except RuntimeError :
58
- # If the log is coming from a thread that doesn't have an event loop, create and set a new one.
59
- loop = asyncio .new_event_loop ()
60
- asyncio .set_event_loop (loop )
61
- loop .create_task (
62
- self ._parent .log (name , record .levelname , time , message , stack ), name = f"{ viam ._TASK_PREFIX } -LOG-{ record .created } "
63
- ).add_done_callback (self .handle_task_result )
91
+ loop = self ._worker .get_loop ()
92
+ asyncio .run_coroutine_threadsafe (
93
+ self ._asynchronously_emit (record , name , message , stack , time ),
94
+ loop ,
95
+ )
64
96
except Exception as err :
65
97
# If the module log fails, log using stdout/stderr handlers
66
98
self ._logger .error (f"ModuleLogger failed for { record .name } - { err } " )
67
99
self ._logger .log (record .levelno , message )
68
100
101
+ async def _asynchronously_emit (self , record : logging .LogRecord , name : str , message : str , stack : str , time : datetime ):
102
+ await self ._worker .wait_until_ready ()
103
+ task = self ._worker .get_loop ().create_task (
104
+ self ._parent .log (name , record .levelname , time , message , stack ),
105
+ name = f"{ viam ._TASK_PREFIX } -LOG-{ record .created } " ,
106
+ )
107
+ task .add_done_callback (lambda t : asyncio .run_coroutine_threadsafe (self .handle_task_result (t ), self ._worker .get_loop ()))
108
+
109
+ def close (self ):
110
+ self ._worker .stop ()
111
+ super ().close ()
112
+
69
113
70
114
class _ColorFormatter (logging .Formatter ):
71
115
MAPPING = {
@@ -76,8 +120,8 @@ class _ColorFormatter(logging.Formatter):
76
120
"CRITICAL" : 41 , # white on red bg
77
121
}
78
122
79
- def __init__ (self , patern ):
80
- logging .Formatter .__init__ (self , patern )
123
+ def __init__ (self , pattern ):
124
+ logging .Formatter .__init__ (self , pattern )
81
125
82
126
def format (self , record ):
83
127
colored_record = copy (record )
0 commit comments