@@ -204,7 +204,12 @@ async def stream(self, *, raw: bool = False) -> AsyncIterator[httpx.Response | N
204
204
205
205
206
206
class StreamedLog :
207
- """Utility class for streaming logs from another actor."""
207
+ """Utility class for streaming logs from another actor.
208
+
209
+ It uses buffer to deal with possibly chunked logs. Chunked logs are stored in buffer. Chunks are expected to contain
210
+ specific markers that indicate the start of the log message. Each time a new chunk with complete split marker
211
+ arrives, the buffer is processed, logged and emptied.
212
+ """
208
213
209
214
# Test related flag to enable propagation of logs to the `caplog` fixture during tests.
210
215
_force_propagate = False
@@ -214,15 +219,13 @@ def __init__(self, to_logger: logging.Logger) -> None:
214
219
if self ._force_propagate :
215
220
to_logger .propagate = True
216
221
self ._stream_buffer = list [str ]()
217
- # Redirected logs are forwarded to logger as soon as there are at least two split markers present in the buffer.
218
- # For example, 2025-05-12T15:35:59.429Z
219
- self ._split_marker = re .compile (r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)' )
222
+ self ._split_marker = re .compile (r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)' ) # Ex:2025-05-12T15:35:59.429Z
220
223
221
224
def _process_new_data (self , data : bytes ) -> None :
222
225
new_chunk = data .decode ('utf-8' )
223
226
self ._stream_buffer .append (new_chunk )
224
227
if re .findall (self ._split_marker , new_chunk ):
225
- # If complete split marker was found in new chunk, then process the buffer.
228
+ # If complete split marker was found in new chunk, then log the buffer.
226
229
self ._log_buffer_content (include_last_part = False )
227
230
228
231
def _log_buffer_content (self , * , include_last_part : bool = False ) -> None :
@@ -231,15 +234,14 @@ def _log_buffer_content(self, *, include_last_part: bool = False) -> None:
231
234
Log the messages created from the split parts and remove them from buffer.
232
235
The last part could be incomplete, and so it can be left unprocessed and in the buffer until later.
233
236
"""
234
- all_parts = re .split (self ._split_marker , '' .join (self ._stream_buffer ))
235
- # First split is empty string
237
+ all_parts = re .split (self ._split_marker , '' .join (self ._stream_buffer ))[1 :] # First split is empty string
236
238
if include_last_part :
237
- message_markers = all_parts [1 ::2 ]
238
- message_contents = all_parts [2 ::2 ]
239
+ message_markers = all_parts [0 ::2 ]
240
+ message_contents = all_parts [1 ::2 ]
239
241
self ._stream_buffer = []
240
242
else :
241
- message_markers = all_parts [1 :- 2 :2 ]
242
- message_contents = all_parts [2 :- 2 :2 ]
243
+ message_markers = all_parts [0 :- 2 :2 ]
244
+ message_contents = all_parts [1 :- 2 :2 ]
243
245
# The last two parts (marker and message) are possibly not complete and will be left in the buffer
244
246
self ._stream_buffer = all_parts [- 2 :]
245
247
@@ -301,11 +303,7 @@ def _stream_log(self) -> None:
301
303
if not log_stream :
302
304
return
303
305
for data in log_stream .iter_bytes ():
304
- new_chunk = data .decode ('utf-8' )
305
- self ._stream_buffer .append (new_chunk )
306
- if re .findall (self ._split_marker , new_chunk ):
307
- # If complete split marker was found in new chunk, then process the buffer.
308
- self ._log_buffer_content (include_last_part = False )
306
+ self ._process_new_data (data )
309
307
if self ._stop_logging :
310
308
break
311
309
0 commit comments