@@ -222,13 +222,11 @@ def __init__(self, to_logger: logging.Logger, *, from_start: bool = True) -> Non
222
222
"""Initialize `StreamedLog`.
223
223
224
224
Args:
225
- to_logger: The logger to which the logs will be redirected
225
+ to_logger: The logger to which the logs will be redirected.
226
226
from_start: If `True`, all logs from the start of the actor run will be redirected. If `False`, only newly
227
227
arrived logs will be redirected. This can be useful for redirecting only a small portion of relevant
228
228
logs for long-running actors in stand-by.
229
229
230
- Returns:
231
- The created logger.
232
230
"""
233
231
self ._to_logger = to_logger
234
232
if self ._force_propagate :
@@ -248,7 +246,7 @@ def _log_buffer_content(self, *, include_last_part: bool = False) -> None:
248
246
"""Merge the whole buffer and split it into parts based on the marker.
249
247
250
248
Log the messages created from the split parts and remove them from buffer.
251
- The last part could be incomplete, and so it can be left unprocessed and in the buffer until later.
249
+ The last part could be incomplete, and so it can be left unprocessed in the buffer until later.
252
250
"""
253
251
all_parts = re .split (self ._split_marker , '' .join (self ._stream_buffer ))[1 :] # First split is empty string
254
252
if include_last_part :
@@ -292,32 +290,34 @@ def __init__(self, log_client: LogClient, *, to_logger: logging.Logger, from_sta
292
290
self ._streaming_thread : Thread | None = None
293
291
self ._stop_logging = False
294
292
295
- def __call__ (self ) -> Thread :
296
- """Start the streaming thread. The caller has to handle any cleanup."""
293
+ def start (self ) -> Thread :
294
+ """Start the streaming thread. The caller has to handle any cleanup by manually calling the `stop` method ."""
297
295
if self ._streaming_thread :
298
296
raise RuntimeError ('Streaming thread already active' )
299
297
self ._stop_logging = False
300
298
self ._streaming_thread = threading .Thread (target = self ._stream_log )
301
299
self ._streaming_thread .start ()
302
300
return self ._streaming_thread
303
301
302
+ def stop (self ) -> None :
303
+ """Signal the streaming thread to stop logging and wait for it to finish."""
304
+ if not self ._streaming_thread :
305
+ raise RuntimeError ('Streaming thread is not active' )
306
+ self ._stop_logging = True
307
+ self ._streaming_thread .join ()
308
+ self ._streaming_thread = None
309
+ self ._stop_logging = False
310
+
304
311
def __enter__ (self ) -> Self :
305
312
"""Start the streaming thread within the context. Exiting the context will finish the streaming thread."""
306
- self ()
313
+ self . start ()
307
314
return self
308
315
309
316
def __exit__ (
310
317
self , exc_type : type [BaseException ] | None , exc_val : BaseException | None , exc_tb : TracebackType | None
311
318
) -> None :
312
319
"""Stop the streaming thread."""
313
- if not self ._streaming_thread :
314
- raise RuntimeError ('Streaming thread is not active' )
315
-
316
- # Signal the thread to stop logging and wait for it to finish.
317
- self ._stop_logging = True
318
- self ._streaming_thread .join ()
319
- self ._streaming_thread = None
320
- self ._stop_logging = False
320
+ self .stop ()
321
321
322
322
def _stream_log (self ) -> None :
323
323
with self ._log_client .stream (raw = True ) as log_stream :
@@ -341,27 +341,31 @@ def __init__(self, log_client: LogClientAsync, *, to_logger: logging.Logger, fro
341
341
self ._log_client = log_client
342
342
self ._streaming_task : Task | None = None
343
343
344
- def __call__ (self ) -> Task :
345
- """Start the streaming task. The caller has to handle any cleanup."""
344
+ def start (self ) -> Task :
345
+ """Start the streaming task. The caller has to handle any cleanup by manually calling the `stop` method ."""
346
346
if self ._streaming_task :
347
347
raise RuntimeError ('Streaming task already active' )
348
348
self ._streaming_task = asyncio .create_task (self ._stream_log ())
349
349
return self ._streaming_task
350
350
351
+ def stop (self ) -> None :
352
+ """Stop the streaming task."""
353
+ if not self ._streaming_task :
354
+ raise RuntimeError ('Streaming task is not active' )
355
+
356
+ self ._streaming_task .cancel ()
357
+ self ._streaming_task = None
358
+
351
359
async def __aenter__ (self ) -> Self :
352
360
"""Start the streaming task within the context. Exiting the context will cancel the streaming task."""
353
- self ()
361
+ self . start ()
354
362
return self
355
363
356
364
async def __aexit__ (
357
365
self , exc_type : type [BaseException ] | None , exc_val : BaseException | None , exc_tb : TracebackType | None
358
366
) -> None :
359
367
"""Cancel the streaming task."""
360
- if not self ._streaming_task :
361
- raise RuntimeError ('Streaming task is not active' )
362
-
363
- self ._streaming_task .cancel ()
364
- self ._streaming_task = None
368
+ self .stop ()
365
369
366
370
async def _stream_log (self ) -> None :
367
371
async with self ._log_client .stream (raw = True ) as log_stream :
0 commit comments