6
6
import threading
7
7
from asyncio import Task
8
8
from contextlib import asynccontextmanager , contextmanager
9
+ from datetime import datetime , timezone
9
10
from threading import Thread
10
11
from typing import TYPE_CHECKING , Any , cast
11
12
@@ -209,6 +210,9 @@ class StreamedLog:
209
210
It uses buffer to deal with possibly chunked logs. Chunked logs are stored in buffer. Chunks are expected to contain
210
211
specific markers that indicate the start of the log message. Each time a new chunk with complete split marker
211
212
arrives, the buffer is processed, logged and emptied.
213
+
214
+ This works only if the logs have datetime marker in ISO format. For example, `2025-05-12T15:35:59.429Z` This is the
215
+ default log standard for the actors.
212
216
"""
213
217
214
218
# Test related flag to enable propagation of logs to the `caplog` fixture during tests.
@@ -230,8 +234,8 @@ def __init__(self, to_logger: logging.Logger, *, from_start: bool = True) -> Non
230
234
if self ._force_propagate :
231
235
to_logger .propagate = True
232
236
self ._stream_buffer = list [str ]()
233
- self ._split_marker = re .compile (r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)' ) # Ex:2025-05-12T15:35:59.429Z
234
- self ._from_start = from_start
237
+ self ._split_marker = re .compile (r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)' )
238
+ self ._relevancy_time_limit : datetime | None = None if from_start else datetime . now ( tz = timezone . utc )
235
239
236
240
def _process_new_data (self , data : bytes ) -> None :
237
241
new_chunk = data .decode ('utf-8' )
@@ -258,6 +262,11 @@ def _log_buffer_content(self, *, include_last_part: bool = False) -> None:
258
262
self ._stream_buffer = all_parts [- 2 :]
259
263
260
264
for marker , content in zip (message_markers , message_contents ):
265
+ if self ._relevancy_time_limit :
266
+ log_time = datetime .fromisoformat (marker .replace ('Z' , '+00:00' ))
267
+ if log_time < self ._relevancy_time_limit :
268
+ # Skip irrelevant logs
269
+ continue
261
270
message = marker + content
262
271
self ._to_logger .log (level = self ._guess_log_level_from_message (message ), msg = message .strip ())
263
272
@@ -314,12 +323,7 @@ def _stream_log(self) -> None:
314
323
with self ._log_client .stream (raw = True ) as log_stream :
315
324
if not log_stream :
316
325
return
317
- # The first chunk contains all older logs from the start of the actor run until now.
318
- skip_first_chunk = not self ._from_start
319
326
for data in log_stream .iter_bytes ():
320
- if skip_first_chunk :
321
- skip_first_chunk = False
322
- continue
323
327
self ._process_new_data (data )
324
328
if self ._stop_logging :
325
329
break
@@ -363,12 +367,7 @@ async def _stream_log(self) -> None:
363
367
async with self ._log_client .stream (raw = True ) as log_stream :
364
368
if not log_stream :
365
369
return
366
- # The first chunk contains all older logs from the start of the actor run until now.
367
- skip_first_chunk = not self ._from_start
368
370
async for data in log_stream .aiter_bytes ():
369
- if skip_first_chunk :
370
- skip_first_chunk = False
371
- continue
372
371
self ._process_new_data (data )
373
372
374
373
# If the stream is finished, then the last part will be also processed.
0 commit comments