Skip to content

feat: Add StatusMessageRedirector #407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a71ae41
TODO: Figure out hwo to mock response with steram
Pijukatel May 12, 2025
69ff84c
WIP
Pijukatel May 12, 2025
862cacc
Polish spliting of messages and setting the log level
Pijukatel May 13, 2025
753427a
Draft with async implementation and example tests
Pijukatel May 13, 2025
cc0d944
Add `raw=True`
Pijukatel May 14, 2025
cbcabd3
Add chunck processing
Pijukatel May 14, 2025
81577e8
Merge remote-tracking branch 'origin/master' into redirected-actor-logs
Pijukatel May 14, 2025
b9bc44d
Add sync version of the logging.
Pijukatel May 14, 2025
9720327
Finalize, update comments
Pijukatel May 14, 2025
85ead2f
Add `from_start` argument for streaming from stand-by actors
Pijukatel May 15, 2025
4ad39fa
Skip first logs based on datetime of the marker
Pijukatel May 15, 2025
74595f9
Self review.
Pijukatel May 15, 2025
cba571f
Handle bytestream edgecase of chunk containing only half of the multi…
Pijukatel May 15, 2025
02a1eb2
Review comments
Pijukatel May 15, 2025
2674cf2
Remove unnecessary `actor_name` argument
Pijukatel May 16, 2025
2a6f2ec
Update split pattern to deal with multiple times redirected log
Pijukatel May 16, 2025
1263450
Review comment
Pijukatel May 16, 2025
b1338f1
Regenerate `uv.lock` with new version of `uv`
Pijukatel May 16, 2025
669a749
Test data time alignment.
Pijukatel May 16, 2025
737cde9
Add status redirector
Pijukatel May 19, 2025
2914e50
TODO: Finalize tests
Pijukatel May 19, 2025
8fbbffa
Finalize tests.
Pijukatel May 20, 2025
8e70e59
Merge remote-tracking branch 'origin/master' into redirect-status-mes…
Pijukatel May 20, 2025
a3a629e
Update syntax to avoid https://github.com/PyCQA/redbaron/issues/212
Pijukatel May 21, 2025
18f4f51
Update client names in tests to match their type
Pijukatel May 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions src/apify_client/clients/resource_clients/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ def call(
waits indefinitely.
logger: Logger used to redirect logs from the Actor run. Using "default" literal means that a predefined
default logger will be used. Setting `None` will disable any log propagation. Passing custom logger
will redirect logs to the provided logger.
will redirect logs to the provided logger. The logger is also used to capture status and status message
of the other Actor run.

Returns:
The run object.
Expand All @@ -336,12 +337,11 @@ def call(
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)

run_client = self.root_client.run(run_id=started_run['id'])

if logger == 'default':
log_context = run_client.get_streamed_log()
else:
log_context = run_client.get_streamed_log(to_logger=logger)
logger = None

with log_context:
with run_client.get_status_message_redirector(to_logger=logger), run_client.get_streamed_log(to_logger=logger):
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)

def build(
Expand Down Expand Up @@ -722,7 +722,8 @@ async def call(
waits indefinitely.
logger: Logger used to redirect logs from the Actor run. Using "default" literal means that a predefined
default logger will be used. Setting `None` will disable any log propagation. Passing custom logger
will redirect logs to the provided logger.
will redirect logs to the provided logger. The logger is also used to capture status and status message
of the other Actor run.

Returns:
The run object.
Expand All @@ -742,12 +743,14 @@ async def call(
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)

run_client = self.root_client.run(run_id=started_run['id'])

if logger == 'default':
log_context = await run_client.get_streamed_log()
else:
log_context = await run_client.get_streamed_log(to_logger=logger)
logger = None

status_redirector = await run_client.get_status_message_redirector(to_logger=logger)
streamed_log = await run_client.get_streamed_log(to_logger=logger)

async with log_context:
async with status_redirector, streamed_log:
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)

async def build(
Expand Down
162 changes: 161 additions & 1 deletion src/apify_client/clients/resource_clients/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import logging
import re
import threading
import time
from asyncio import Task
from contextlib import asynccontextmanager, contextmanager
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from threading import Thread
from typing import TYPE_CHECKING, Any, cast

from apify_shared.utils import ignore_docs
from typing_extensions import Self

from apify_client._errors import ApifyApiError
from apify_client._utils import catch_not_found_or_throw
Expand All @@ -23,6 +25,8 @@
import httpx
from typing_extensions import Self

from apify_client.clients import RunClient, RunClientAsync


class LogClient(ResourceClient):
"""Sub-client for manipulating logs."""
Expand Down Expand Up @@ -378,3 +382,159 @@ async def _stream_log(self) -> None:

# If the stream is finished, then the last part will be also processed.
self._log_buffer_content(include_last_part=True)


class StatusMessageRedirector:
"""Utility class for logging status messages from another Actor run.

Status message is logged at fixed time intervals, and there is no guarantee that all messages will be logged,
especially in cases of frequent status message changes.
"""

_force_propagate = False
# This is final sleep time to try to get the last status and status message of finished Actor run.
# The status and status message can get set on the Actor run with a delay. Sleep time does not guarantee that the
# final message will be captured, but increases the chances of that.
_final_sleep_time_s = 6

def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=5)) -> None:
"""Initialize `StatusMessageRedirector`.

Args:
to_logger: The logger to which the status message will be redirected.
check_period: The period with which the status message will be polled.
"""
self._to_logger = to_logger
self._to_logger.propagate = self._force_propagate
self._check_period = check_period.total_seconds()
self._last_status_message = ''

def _log_run_data(self, run_data: dict[str, Any] | None) -> bool:
"""Get relevant run data, log them if changed and return `True` if more data is expected.

Args:
run_data: The dictionary that contains the run data.

Returns:
`True` if more data is expected, `False` otherwise.
"""
if run_data is not None:
status = run_data.get('status', 'Unknown status')
status_message = run_data.get('statusMessage', '')
new_status_message = f'Status: {status}, Message: {status_message}'

if new_status_message != self._last_status_message:
self._last_status_message = new_status_message
self._to_logger.info(new_status_message)

return not (run_data.get('isStatusMessageTerminal', False))
return True


class StatusMessageRedirectorAsync(StatusMessageRedirector):
"""Async variant of `StatusMessageRedirector` that is logging in task."""

def __init__(
self, *, run_client: RunClientAsync, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1)
) -> None:
"""Initialize `StatusMessageRedirectorAsync`.

Args:
run_client: The client for run that will be used to get a status and message.
to_logger: The logger to which the status message will be redirected.
check_period: The period with which the status message will be polled.
"""
super().__init__(to_logger=to_logger, check_period=check_period)
self._run_client = run_client
self._logging_task: Task | None = None

def start(self) -> Task:
"""Start the logging task. The caller has to handle any cleanup by manually calling the `stop` method."""
if self._logging_task:
raise RuntimeError('Logging task already active')
self._logging_task = asyncio.create_task(self._log_changed_status_message())
return self._logging_task

def stop(self) -> None:
"""Stop the logging task."""
if not self._logging_task:
raise RuntimeError('Logging task is not active')

self._logging_task.cancel()
self._logging_task = None

async def __aenter__(self) -> Self:
"""Start the logging task within the context. Exiting the context will cancel the logging task."""
self.start()
return self

async def __aexit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
"""Cancel the logging task."""
await asyncio.sleep(self._final_sleep_time_s)
self.stop()

async def _log_changed_status_message(self) -> None:
while True:
run_data = await self._run_client.get()
if not self._log_run_data(run_data):
break
await asyncio.sleep(self._check_period)


class StatusMessageRedirectorSync(StatusMessageRedirector):
"""Sync variant of `StatusMessageRedirector` that is logging in thread."""

def __init__(
self, *, run_client: RunClient, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1)
) -> None:
"""Initialize `StatusMessageRedirectorSync`.

Args:
run_client: The client for run that will be used to get a status and message.
to_logger: The logger to which the status message will be redirected.
check_period: The period with which the status message will be polled.
"""
super().__init__(to_logger=to_logger, check_period=check_period)
self._run_client = run_client
self._logging_thread: Thread | None = None
self._stop_logging = False

def start(self) -> Thread:
"""Start the logging thread. The caller has to handle any cleanup by manually calling the `stop` method."""
if self._logging_thread:
raise RuntimeError('Logging thread already active')
self._stop_logging = False
self._logging_thread = threading.Thread(target=self._log_changed_status_message)
self._logging_thread.start()
return self._logging_thread

def stop(self) -> None:
"""Signal the _logging_thread thread to stop logging and wait for it to finish."""
if not self._logging_thread:
raise RuntimeError('Logging thread is not active')
time.sleep(self._final_sleep_time_s)
self._stop_logging = True
self._logging_thread.join()
self._logging_thread = None
self._stop_logging = False

def __enter__(self) -> Self:
"""Start the logging task within the context. Exiting the context will cancel the logging task."""
self.start()
return self

def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
"""Cancel the logging task."""
self.stop()

def _log_changed_status_message(self) -> None:
while True:
if not self._log_run_data(self._run_client.get()):
break
if self._stop_logging:
break
time.sleep(self._check_period)
62 changes: 62 additions & 0 deletions src/apify_client/clients/resource_clients/run.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations

import json
import logging
import random
import string
import time
from datetime import timedelta
from typing import TYPE_CHECKING, Any

from apify_shared.utils import filter_out_none_values_recursively, ignore_docs, parse_date_fields
Expand All @@ -16,6 +18,8 @@
from apify_client.clients.resource_clients.log import (
LogClient,
LogClientAsync,
StatusMessageRedirectorAsync,
StatusMessageRedirectorSync,
StreamedLogAsync,
StreamedLogSync,
)
Expand Down Expand Up @@ -318,6 +322,34 @@ def charge(
),
)

def get_status_message_redirector(
self, to_logger: logging.Logger | None = None, check_period: timedelta = timedelta(seconds=1)
) -> StatusMessageRedirectorSync:
"""Get `StatusMessageRedirector` instance that can be used to redirect logs.

`StatusMessageRedirector` can be directly called or used as a context manager.

Args:
to_logger: `Logger` used for logging the status and status messages. If not provided, a new logger is
created.
check_period: The period with which the status message will be polled.

Returns:
`StatusMessageRedirector` instance for redirected logs.
"""
run_data = self.get()
run_id = run_data.get('id', '') if run_data else ''

actor_id = run_data.get('actId', '') if run_data else ''
actor_data = self.root_client.actor(actor_id=actor_id).get() or {}
actor_name = actor_data.get('name', '') if run_data else ''

if not to_logger:
name = '-'.join(part for part in (actor_name, run_id) if part)
to_logger = create_redirect_logger(f'apify.{name}')

return StatusMessageRedirectorSync(run_client=self, to_logger=to_logger, check_period=check_period)


class RunClientAsync(ActorJobBaseClientAsync):
"""Async sub-client for manipulating a single Actor run."""
Expand Down Expand Up @@ -612,3 +644,33 @@ async def charge(
}
),
)

async def get_status_message_redirector(
self,
to_logger: logging.Logger | None = None,
check_period: timedelta = timedelta(seconds=1),
) -> StatusMessageRedirectorAsync:
"""Get `StatusMessageRedirector` instance that can be used to redirect logs.

`StatusMessageRedirector` can be directly called or used as a context manager.

Args:
to_logger: `Logger` used for logging the status and status messages. If not provided, a new logger is
created.
check_period: The period with which the status message will be polled.

Returns:
`StatusMessageRedirector` instance for redirected logs.
"""
run_data = await self.get()
run_id = run_data.get('id', '') if run_data else ''

actor_id = run_data.get('actId', '') if run_data else ''
actor_data = await self.root_client.actor(actor_id=actor_id).get() or {}
actor_name = actor_data.get('name', '') if run_data else ''

if not to_logger:
name = '-'.join(part for part in (actor_name, run_id) if part)
to_logger = create_redirect_logger(f'apify.{name}')

return StatusMessageRedirectorAsync(run_client=self, to_logger=to_logger, check_period=check_period)
Loading