Skip to content

Commit 350fc67

Browse files
committed
Review comments
1 parent 1e5e976 commit 350fc67

File tree

3 files changed

+20
-15
lines changed

3 files changed

+20
-15
lines changed

src/apify_client/clients/resource_clients/log.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from typing import TYPE_CHECKING, Any, cast
1313

1414
from apify_shared.utils import ignore_docs
15-
from typing_extensions import Self
1615

1716
from apify_client._errors import ApifyApiError
1817
from apify_client._utils import catch_not_found_or_throw
@@ -354,13 +353,16 @@ def start(self) -> Task:
354353
self._streaming_task = asyncio.create_task(self._stream_log())
355354
return self._streaming_task
356355

357-
def stop(self) -> None:
356+
async def stop(self) -> None:
358357
"""Stop the streaming task."""
359358
if not self._streaming_task:
360359
raise RuntimeError('Streaming task is not active')
361360

362361
self._streaming_task.cancel()
363-
self._streaming_task = None
362+
try:
363+
await self._streaming_task
364+
except asyncio.CancelledError:
365+
self._streaming_task = None
364366

365367
async def __aenter__(self) -> Self:
366368
"""Start the streaming task within the context. Exiting the context will cancel the streaming task."""
@@ -371,7 +373,7 @@ async def __aexit__(
371373
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
372374
) -> None:
373375
"""Cancel the streaming task."""
374-
self.stop()
376+
await self.stop()
375377

376378
async def _stream_log(self) -> None:
377379
async with self._log_client.stream(raw=True) as log_stream:
@@ -456,13 +458,16 @@ def start(self) -> Task:
456458
self._logging_task = asyncio.create_task(self._log_changed_status_message())
457459
return self._logging_task
458460

459-
def stop(self) -> None:
461+
async def stop(self) -> None:
460462
"""Stop the logging task."""
461463
if not self._logging_task:
462464
raise RuntimeError('Logging task is not active')
463465

464466
self._logging_task.cancel()
465-
self._logging_task = None
467+
try:
468+
await self._logging_task
469+
except asyncio.CancelledError:
470+
self._logging_task = None
466471

467472
async def __aenter__(self) -> Self:
468473
"""Start the logging task within the context. Exiting the context will cancel the logging task."""
@@ -474,7 +479,7 @@ async def __aexit__(
474479
) -> None:
475480
"""Cancel the logging task."""
476481
await asyncio.sleep(self._final_sleep_time_s)
477-
self.stop()
482+
await self.stop()
478483

479484
async def _log_changed_status_message(self) -> None:
480485
while True:

src/apify_client/clients/resource_clients/run.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ def log(self) -> LogClient:
262262
def get_streamed_log(self, to_logger: logging.Logger | None = None, *, from_start: bool = True) -> StreamedLogSync:
263263
"""Get `StreamedLog` instance that can be used to redirect logs.
264264
265-
`StreamedLog` can be directly called or used as a context manager.
265+
`StreamedLog` can be explicitly started and stopped or used as a context manager.
266266
267267
Args:
268268
to_logger: `Logger` used for logging the redirected messages. If not provided, a new logger is created
@@ -327,7 +327,7 @@ def get_status_message_watcher(
327327
) -> StatusMessageWatcherSync:
328328
"""Get `StatusMessageWatcher` instance that can be used to redirect status and status messages to logs.
329329
330-
`StatusMessageWatcher` can be directly called or used as a context manager.
330+
`StatusMessageWatcher` can be explicitly started and stopped or used as a context manager.
331331
332332
Args:
333333
to_logger: `Logger` used for logging the status and status messages. If not provided, a new logger is
@@ -586,7 +586,7 @@ async def get_streamed_log(
586586
) -> StreamedLogAsync:
587587
"""Get `StreamedLog` instance that can be used to redirect logs.
588588
589-
`StreamedLog` can be directly called or used as a context manager.
589+
`StreamedLog` can be explicitly started and stopped or used as a context manager.
590590
591591
Args:
592592
to_logger: `Logger` used for logging the redirected messages. If not provided, a new logger is created
@@ -652,7 +652,7 @@ async def get_status_message_watcher(
652652
) -> StatusMessageWatcherAsync:
653653
"""Get `StatusMessageWatcher` instance that can be used to redirect status and status messages to logs.
654654
655-
`StatusMessageWatcher` can be directly called or used as a context manager.
655+
`StatusMessageWatcher` can be explicitly started and stopped or used as a context manager.
656656
657657
Args:
658658
to_logger: `Logger` used for logging the status and status messages. If not provided, a new logger is

tests/unit/test_logging.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import json
33
import logging
44
import time
5-
from collections.abc import AsyncIterator, Iterator
5+
from collections.abc import AsyncIterator, Generator, Iterator
66
from datetime import datetime, timedelta
77
from unittest.mock import patch
88

@@ -65,7 +65,7 @@
6565

6666
@pytest.fixture
6767
def mock_api() -> None:
68-
def create_status_responses_generator() -> Iterator[httpx.Response]:
68+
def get_responses() -> Generator[httpx.Response, None, None]:
6969
"""Simulate actor run that changes status 3 times."""
7070
for _ in range(5):
7171
yield httpx.Response(
@@ -113,11 +113,11 @@ def create_status_responses_generator() -> Iterator[httpx.Response]:
113113
status_code=200,
114114
)
115115

116-
response_generator = create_status_responses_generator()
116+
responses = get_responses()
117117

118118
def actor_runs_side_effect(_: httpx.Request) -> httpx.Response:
119119
time.sleep(0.1)
120-
return next(response_generator)
120+
return next(responses)
121121

122122
respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}').mock(side_effect=actor_runs_side_effect)
123123

0 commit comments

Comments
 (0)