Skip to content

Commit 85ead2f

Browse files
committed
Add from_start argument for streaming from stand-by actors
Update default logger to not duplicate handlers if already exists
1 parent 9720327 commit 85ead2f

File tree

4 files changed

+62
-16
lines changed

4 files changed

+62
-16
lines changed

src/apify_client/_logging.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,14 @@ def create_redirect_logger(
137137
The created logger.
138138
"""
139139
to_logger = logging.getLogger(name)
140-
141140
to_logger.propagate = False
141+
142+
# Remove filters and handlers in case this logger already exists and was set up in some way.
143+
for handler in to_logger.handlers:
144+
to_logger.removeHandler(handler)
145+
for log_filter in to_logger.filters:
146+
to_logger.removeFilter(log_filter)
147+
142148
handler = logging.StreamHandler()
143149
handler.setFormatter(RedirectLogFormatter())
144150
to_logger.addHandler(handler)

src/apify_client/clients/resource_clients/log.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,24 @@ class StreamedLog:
214214
# Test related flag to enable propagation of logs to the `caplog` fixture during tests.
215215
_force_propagate = False
216216

217-
def __init__(self, to_logger: logging.Logger) -> None:
217+
def __init__(self, to_logger: logging.Logger, *, from_start: bool = True) -> None:
218+
"""Initialize `StreamedLog`.
219+
220+
Args:
221+
to_logger: The logger to which the logs will be redirected
222+
from_start: If `True`, all logs from the start of the actor run will be redirected. If `False`, only newly
223+
arrived logs will be redirected. This can be useful for redirecting only a small portion of relevant
224+
logs for long-running actors in stand-by.
225+
226+
Returns:
227+
The created logger.
228+
"""
218229
self._to_logger = to_logger
219230
if self._force_propagate:
220231
to_logger.propagate = True
221232
self._stream_buffer = list[str]()
222233
self._split_marker = re.compile(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)') # Ex:2025-05-12T15:35:59.429Z
234+
self._from_start = from_start
223235

224236
def _process_new_data(self, data: bytes) -> None:
225237
new_chunk = data.decode('utf-8')
@@ -265,8 +277,8 @@ def _guess_log_level_from_message(message: str) -> int:
265277
class StreamedLogSync(StreamedLog):
266278
"""Sync variant of `StreamedLog` that is logging in threads."""
267279

268-
def __init__(self, log_client: LogClient, to_logger: logging.Logger) -> None:
269-
super().__init__(to_logger=to_logger)
280+
def __init__(self, log_client: LogClient, *, to_logger: logging.Logger, from_start: bool = True) -> None:
281+
super().__init__(to_logger=to_logger, from_start=from_start)
270282
self._log_client = log_client
271283
self._streaming_thread: Thread | None = None
272284
self._stop_logging = False
@@ -302,7 +314,12 @@ def _stream_log(self) -> None:
302314
with self._log_client.stream(raw=True) as log_stream:
303315
if not log_stream:
304316
return
317+
# The first chunk contains all older logs from the start of the actor run until now.
318+
skip_first_chunk = not self._from_start
305319
for data in log_stream.iter_bytes():
320+
if skip_first_chunk:
321+
skip_first_chunk = False
322+
continue
306323
self._process_new_data(data)
307324
if self._stop_logging:
308325
break
@@ -315,8 +332,8 @@ def _stream_log(self) -> None:
315332
class StreamedLogAsync(StreamedLog):
316333
"""Async variant of `StreamedLog` that is logging in tasks."""
317334

318-
def __init__(self, log_client: LogClientAsync, to_logger: logging.Logger) -> None:
319-
super().__init__(to_logger=to_logger)
335+
def __init__(self, log_client: LogClientAsync, *, to_logger: logging.Logger, from_start: bool = True) -> None:
336+
super().__init__(to_logger=to_logger, from_start=from_start)
320337
self._log_client = log_client
321338
self._streaming_task: Task | None = None
322339

@@ -346,7 +363,12 @@ async def _stream_log(self) -> None:
346363
async with self._log_client.stream(raw=True) as log_stream:
347364
if not log_stream:
348365
return
366+
# The first chunk contains all older logs from the start of the actor run until now.
367+
skip_first_chunk = not self._from_start
349368
async for data in log_stream.aiter_bytes():
369+
if skip_first_chunk:
370+
skip_first_chunk = False
371+
continue
350372
self._process_new_data(data)
351373

352374
# If the stream is finished, then the last part will be also processed.

src/apify_client/clients/resource_clients/run.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,19 @@ def log(self) -> LogClient:
255255
**self._sub_resource_init_options(resource_path='log'),
256256
)
257257

258-
def get_streamed_log(self, to_logger: logging.Logger | None = None, actor_name: str = '') -> StreamedLogSync:
258+
def get_streamed_log(
259+
self, to_logger: logging.Logger | None = None, *, actor_name: str = '', from_start: bool = True
260+
) -> StreamedLogSync:
259261
"""Get `StreamedLog` instance that can be used to redirect logs.
260262
261263
`StreamedLog` can be directly called or used as a context manager.
262264
263265
Args:
264266
to_logger: `Logger` used for logging the redirected messages. If not provided, a new logger is created
265267
actor_name: Optional component of default logger name.
268+
from_start: If `True`, all logs from the start of the actor run will be redirected. If `False`, only newly
269+
arrived logs will be redirected. This can be useful for redirecting only a small portion of relevant
270+
logs for long-running actors in stand-by.
266271
267272
Returns:
268273
`StreamedLog` instance for redirected logs.
@@ -274,7 +279,7 @@ def get_streamed_log(self, to_logger: logging.Logger | None = None, actor_name:
274279
name = '-'.join(part for part in (actor_name, run_id) if part)
275280
to_logger = create_redirect_logger(f'apify.{name}')
276281

277-
return StreamedLogSync(log_client=self.log(), to_logger=to_logger)
282+
return StreamedLogSync(log_client=self.log(), to_logger=to_logger, from_start=from_start)
278283

279284
def charge(
280285
self,
@@ -543,14 +548,19 @@ def log(self) -> LogClientAsync:
543548
**self._sub_resource_init_options(resource_path='log'),
544549
)
545550

546-
async def get_streamed_log(self, to_logger: logging.Logger | None = None, actor_name: str = '') -> StreamedLogAsync:
551+
async def get_streamed_log(
552+
self, to_logger: logging.Logger | None = None, *, actor_name: str = '', from_start: bool = True
553+
) -> StreamedLogAsync:
547554
"""Get `StreamedLog` instance that can be used to redirect logs.
548555
549556
`StreamedLog` can be directly called or used as a context manager.
550557
551558
Args:
552559
to_logger: `Logger` used for logging the redirected messages. If not provided, a new logger is created
553560
actor_name: Optional component of default logger name.
561+
from_start: If `True`, all logs from the start of the actor run will be redirected. If `False`, only newly
562+
arrived logs will be redirected. This can be useful for redirecting only a small portion of relevant
563+
logs for long-running actors in stand-by.
554564
555565
Returns:
556566
`StreamedLog` instance for redirected logs.
@@ -562,7 +572,7 @@ async def get_streamed_log(self, to_logger: logging.Logger | None = None, actor_
562572
name = '-'.join(part for part in (actor_name, run_id) if part)
563573
to_logger = create_redirect_logger(f'apify.{name}')
564574

565-
return StreamedLogAsync(log_client=self.log(), to_logger=to_logger)
575+
return StreamedLogAsync(log_client=self.log(), to_logger=to_logger, from_start=from_start)
566576

567577
async def charge(
568578
self,

tests/unit/test_logging.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,16 +113,20 @@ def propagate_stream_logs() -> None:
113113
logging.getLogger(f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}').setLevel(logging.DEBUG)
114114

115115

116+
@pytest.mark.parametrize(('log_from_start', 'expected_log_count'), [(True, 8), (False, 5)])
116117
@respx.mock
117118
async def test_redirected_logs_async(
119+
*,
118120
caplog: LogCaptureFixture,
119121
mock_api_async: None, # noqa: ARG001, fixture
120122
propagate_stream_logs: None, # noqa: ARG001, fixture
123+
log_from_start: bool,
124+
expected_log_count: int,
121125
) -> None:
122126
"""Test that redirected logs are formatted correctly."""
123127

124128
run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID)
125-
streamed_log = await run_client.get_streamed_log(actor_name=_MOCKED_ACTOR_NAME)
129+
streamed_log = await run_client.get_streamed_log(actor_name=_MOCKED_ACTOR_NAME, from_start=log_from_start)
126130

127131
# Set `propagate=True` during the tests, so that caplog can see the logs..
128132
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
@@ -132,22 +136,26 @@ async def test_redirected_logs_async(
132136
# Do stuff while the log from the other actor is being redirected to the logs.
133137
await asyncio.sleep(1)
134138

135-
assert len(caplog.records) == 8
136-
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
139+
assert len(caplog.records) == expected_log_count
140+
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:], caplog.records):
137141
assert expected_message_and_level[0] == record.message
138142
assert expected_message_and_level[1] == record.levelno
139143

140144

145+
@pytest.mark.parametrize(('log_from_start', 'expected_log_count'), [(True, 8), (False, 5)])
141146
@respx.mock
142147
def test_redirected_logs_sync(
148+
*,
143149
caplog: LogCaptureFixture,
144150
mock_api_sync: None, # noqa: ARG001, fixture
145151
propagate_stream_logs: None, # noqa: ARG001, fixture
152+
log_from_start: bool,
153+
expected_log_count: int,
146154
) -> None:
147155
"""Test that redirected logs are formatted correctly."""
148156

149157
run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID)
150-
streamed_log = run_client.get_streamed_log(actor_name=_MOCKED_ACTOR_NAME)
158+
streamed_log = run_client.get_streamed_log(actor_name=_MOCKED_ACTOR_NAME, from_start=log_from_start)
151159

152160
# Set `propagate=True` during the tests, so that caplog can see the logs..
153161
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
@@ -156,8 +164,8 @@ def test_redirected_logs_sync(
156164
# Do stuff while the log from the other actor is being redirected to the logs.
157165
time.sleep(1)
158166

159-
assert len(caplog.records) == 8
160-
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
167+
assert len(caplog.records) == expected_log_count
168+
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:], caplog.records):
161169
assert expected_message_and_level[0] == record.message
162170
assert expected_message_and_level[1] == record.levelno
163171

0 commit comments

Comments
 (0)