Skip to content

Commit 1eb1c19

Browse files
authored
fix: Fix BasicCrawler statistics persistence (#1490)
### Description - Ensure that `BasicCrawler` is persisting statistics by default. - Ensure that `BasicCrawler` is recovering existing statistics by default if `Configuration.purge_on_start ` is False. - Let the `BasicCrawler` emit `Event.PERSIST_STATE` when finishing. ### Issues - Closes: #1501 ### Testing - Added unit test - Tested on SDK level: apify/apify-sdk-python#629
1 parent 01f8460 commit 1eb1c19

File tree

5 files changed

+136
-23
lines changed

5 files changed

+136
-23
lines changed

src/crawlee/_utils/recurring_task.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
if TYPE_CHECKING:
88
from collections.abc import Callable
99
from datetime import timedelta
10+
from types import TracebackType
11+
12+
from typing_extensions import Self
1013

1114
logger = getLogger(__name__)
1215

@@ -26,6 +29,18 @@ def __init__(self, func: Callable, delay: timedelta) -> None:
2629
self.delay = delay
2730
self.task: asyncio.Task | None = None
2831

32+
async def __aenter__(self) -> Self:
33+
self.start()
34+
return self
35+
36+
async def __aexit__(
37+
self,
38+
exc_type: type[BaseException] | None,
39+
exc_value: BaseException | None,
40+
exc_traceback: TracebackType | None,
41+
) -> None:
42+
await self.stop()
43+
2944
async def _wrapper(self) -> None:
3045
"""Continuously execute the provided function with the specified delay.
3146

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
SessionError,
5757
UserDefinedErrorHandlerError,
5858
)
59-
from crawlee.events._types import Event, EventCrawlerStatusData
59+
from crawlee.events._types import Event, EventCrawlerStatusData, EventPersistStateData
6060
from crawlee.http_clients import ImpitHttpClient
6161
from crawlee.router import Router
6262
from crawlee.sessions import SessionPool
@@ -437,14 +437,23 @@ def __init__(
437437
self._statistics_log_format = statistics_log_format
438438

439439
# Statistics
440-
self._statistics = statistics or cast(
441-
'Statistics[TStatisticsState]',
442-
Statistics.with_default_state(
443-
periodic_message_logger=self._logger,
444-
statistics_log_format=self._statistics_log_format,
445-
log_message='Current request statistics:',
446-
),
447-
)
440+
if statistics:
441+
self._statistics = statistics
442+
else:
443+
444+
async def persist_state_factory() -> KeyValueStore:
445+
return await self.get_key_value_store()
446+
447+
self._statistics = cast(
448+
'Statistics[TStatisticsState]',
449+
Statistics.with_default_state(
450+
persistence_enabled=True,
451+
periodic_message_logger=self._logger,
452+
statistics_log_format=self._statistics_log_format,
453+
log_message='Current request statistics:',
454+
persist_state_kvs_factory=persist_state_factory,
455+
),
456+
)
448457

449458
# Additional context managers to enter and exit
450459
self._additional_context_managers = _additional_context_managers or []
@@ -689,7 +698,6 @@ def sigint_handler() -> None:
689698
except CancelledError:
690699
pass
691700
finally:
692-
await self._crawler_state_rec_task.stop()
693701
if threading.current_thread() is threading.main_thread():
694702
with suppress(NotImplementedError):
695703
asyncio.get_running_loop().remove_signal_handler(signal.SIGINT)
@@ -721,8 +729,6 @@ def sigint_handler() -> None:
721729
async def _run_crawler(self) -> None:
722730
event_manager = self._service_locator.get_event_manager()
723731

724-
self._crawler_state_rec_task.start()
725-
726732
# Collect the context managers to be entered. Context managers that are already active are excluded,
727733
# as they were likely entered by the caller, who will also be responsible for exiting them.
728734
contexts_to_enter = [
@@ -733,6 +739,7 @@ async def _run_crawler(self) -> None:
733739
self._statistics,
734740
self._session_pool if self._use_session_pool else None,
735741
self._http_client,
742+
self._crawler_state_rec_task,
736743
*self._additional_context_managers,
737744
)
738745
if cm and getattr(cm, 'active', False) is False
@@ -744,6 +751,9 @@ async def _run_crawler(self) -> None:
744751

745752
await self._autoscaled_pool.run()
746753

754+
# Emit PERSIST_STATE event when crawler is finishing to allow listeners to persist their state if needed
755+
event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=False))
756+
747757
async def add_requests(
748758
self,
749759
requests: Sequence[str | Request],

src/crawlee/statistics/_statistics.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def __init__(
9696

9797
self._state = RecoverableState(
9898
default_state=state_model(stats_id=self._id),
99-
persist_state_key=persist_state_key or f'SDK_CRAWLER_STATISTICS_{self._id}',
99+
persist_state_key=persist_state_key or f'__CRAWLER_STATISTICS_{self._id}',
100100
persistence_enabled=persistence_enabled,
101101
persist_state_kvs_name=persist_state_kvs_name,
102102
persist_state_kvs_factory=persist_state_kvs_factory,
@@ -130,6 +130,7 @@ def with_default_state(
130130
persistence_enabled: bool = False,
131131
persist_state_kvs_name: str | None = None,
132132
persist_state_key: str | None = None,
133+
persist_state_kvs_factory: Callable[[], Coroutine[None, None, KeyValueStore]] | None = None,
133134
log_message: str = 'Statistics',
134135
periodic_message_logger: Logger | None = None,
135136
log_interval: timedelta = timedelta(minutes=1),
@@ -141,6 +142,7 @@ def with_default_state(
141142
persistence_enabled=persistence_enabled,
142143
persist_state_kvs_name=persist_state_kvs_name,
143144
persist_state_key=persist_state_key,
145+
persist_state_kvs_factory=persist_state_kvs_factory,
144146
log_message=log_message,
145147
periodic_message_logger=periodic_message_logger,
146148
log_interval=log_interval,
@@ -187,7 +189,10 @@ async def __aexit__(
187189
if not self._active:
188190
raise RuntimeError(f'The {self.__class__.__name__} is not active.')
189191

190-
self._state.current_value.crawler_finished_at = datetime.now(timezone.utc)
192+
if not self.state.crawler_last_started_at:
193+
raise RuntimeError('Statistics.state.crawler_last_started_at not set.')
194+
self.state.crawler_finished_at = datetime.now(timezone.utc)
195+
self.state.crawler_runtime += self.state.crawler_finished_at - self.state.crawler_last_started_at
191196

192197
await self._state.teardown()
193198

@@ -255,8 +260,7 @@ def calculate(self) -> FinalStatistics:
255260
if self._instance_start is None:
256261
raise RuntimeError('The Statistics object is not initialized')
257262

258-
crawler_runtime = datetime.now(timezone.utc) - self._instance_start
259-
total_minutes = crawler_runtime.total_seconds() / 60
263+
total_minutes = self.state.crawler_runtime.total_seconds() / 60
260264
state = self._state.current_value
261265
serialized_state = state.model_dump(by_alias=False)
262266

@@ -267,7 +271,7 @@ def calculate(self) -> FinalStatistics:
267271
requests_failed_per_minute=math.floor(state.requests_failed / total_minutes) if total_minutes else 0,
268272
request_total_duration=state.request_total_finished_duration + state.request_total_failed_duration,
269273
requests_total=state.requests_failed + state.requests_finished,
270-
crawler_runtime=crawler_runtime,
274+
crawler_runtime=state.crawler_runtime,
271275
requests_finished=state.requests_finished,
272276
requests_failed=state.requests_failed,
273277
retry_histogram=serialized_state['request_retry_histogram'],

tests/unit/crawlers/_basic/test_basic_crawler.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from __future__ import annotations
33

44
import asyncio
5+
import concurrent
56
import json
67
import logging
78
import os
@@ -1643,3 +1644,60 @@ async def handler(context: BasicCrawlingContext) -> None:
16431644

16441645
# Crawler should not fall back to the default storage after the purge
16451646
assert await unrelated_rq.fetch_next_request() == unrelated_request
1647+
1648+
1649+
async def _run_crawler(requests: list[str], storage_dir: str) -> StatisticsState:
1650+
"""Run crawler and return its statistics state.
1651+
1652+
Must be defined like this to be pickable for ProcessPoolExecutor."""
1653+
service_locator.set_configuration(
1654+
Configuration(
1655+
crawlee_storage_dir=storage_dir, # type: ignore[call-arg]
1656+
purge_on_start=False,
1657+
)
1658+
)
1659+
1660+
async def request_handler(context: BasicCrawlingContext) -> None:
1661+
context.log.info(f'Processing {context.request.url} ...')
1662+
1663+
crawler = BasicCrawler(
1664+
request_handler=request_handler,
1665+
concurrency_settings=ConcurrencySettings(max_concurrency=1, desired_concurrency=1),
1666+
)
1667+
1668+
await crawler.run(requests)
1669+
return crawler.statistics.state
1670+
1671+
1672+
def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsState:
1673+
return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir))
1674+
1675+
1676+
async def test_crawler_statistics_persistence(tmp_path: Path) -> None:
1677+
"""Test that crawler statistics persist and are loaded correctly.
1678+
1679+
This test simulates starting the crawler process twice, and checks that the statistics include first run."""
1680+
1681+
with concurrent.futures.ProcessPoolExecutor() as executor:
1682+
# Crawl 2 requests in the first run and automatically persist the state.
1683+
first_run_state = executor.submit(
1684+
_process_run_crawler,
1685+
requests=['https://a.placeholder.com', 'https://b.placeholder.com'],
1686+
storage_dir=str(tmp_path),
1687+
).result()
1688+
assert first_run_state.requests_finished == 2
1689+
1690+
# Do not reuse the executor to simulate a fresh process to avoid modified class attributes.
1691+
with concurrent.futures.ProcessPoolExecutor() as executor:
1692+
# Crawl 1 additional requests in the second run, but use previously automatically persisted state.
1693+
second_run_state = executor.submit(
1694+
_process_run_crawler, requests=['https://c.placeholder.com'], storage_dir=str(tmp_path)
1695+
).result()
1696+
assert second_run_state.requests_finished == 3
1697+
1698+
assert first_run_state.crawler_started_at == second_run_state.crawler_started_at
1699+
assert first_run_state.crawler_finished_at
1700+
assert second_run_state.crawler_finished_at
1701+
1702+
assert first_run_state.crawler_finished_at < second_run_state.crawler_finished_at
1703+
assert first_run_state.crawler_runtime < second_run_state.crawler_runtime

tests/unit/test_configuration.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from crawlee import service_locator
99
from crawlee.configuration import Configuration
1010
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
11+
from crawlee.statistics import Statistics
1112
from crawlee.storage_clients import MemoryStorageClient
1213
from crawlee.storage_clients._file_system._storage_client import FileSystemStorageClient
1314

@@ -35,15 +36,40 @@ def test_global_configuration_works_reversed() -> None:
3536
)
3637

3738

38-
async def test_storage_not_persisted_when_disabled(tmp_path: Path, server_url: URL) -> None:
39+
async def test_storage_not_persisted_when_non_persistable_storage_used(tmp_path: Path, server_url: URL) -> None:
40+
"""Make the Crawler use MemoryStorageClient which can't persist state."""
41+
service_locator.set_configuration(
42+
Configuration(
43+
crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg]
44+
)
45+
)
46+
crawler = HttpCrawler(storage_client=MemoryStorageClient())
47+
48+
@crawler.router.default_handler
49+
async def default_handler(context: HttpCrawlingContext) -> None:
50+
await context.push_data({'url': context.request.url})
51+
52+
await crawler.run([str(server_url)])
53+
54+
# Verify that no files were created in the storage directory.
55+
content = list(tmp_path.iterdir())
56+
assert content == [], 'Expected the storage directory to be empty, but it is not.'
57+
58+
59+
async def test_storage_persisted_with_explicit_statistics_with_persistable_storage(
60+
tmp_path: Path, server_url: URL
61+
) -> None:
62+
"""Make the Crawler use MemoryStorageClient which can't persist state,
63+
but pass explicit statistics to it which will use global FileSystemStorageClient() that can persist state."""
64+
3965
configuration = Configuration(
4066
crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg]
4167
)
42-
storage_client = MemoryStorageClient()
68+
service_locator.set_configuration(configuration)
69+
service_locator.set_storage_client(FileSystemStorageClient())
4370

4471
crawler = HttpCrawler(
45-
configuration=configuration,
46-
storage_client=storage_client,
72+
storage_client=MemoryStorageClient(), statistics=Statistics.with_default_state(persistence_enabled=True)
4773
)
4874

4975
@crawler.router.default_handler
@@ -52,9 +78,9 @@ async def default_handler(context: HttpCrawlingContext) -> None:
5278

5379
await crawler.run([str(server_url)])
5480

55-
# Verify that no files were created in the storage directory.
81+
# Verify that files were created in the storage directory.
5682
content = list(tmp_path.iterdir())
57-
assert content == [], 'Expected the storage directory to be empty, but it is not.'
83+
assert content != [], 'Expected the storage directory to contain files, but it does not.'
5884

5985

6086
async def test_storage_persisted_when_enabled(tmp_path: Path, server_url: URL) -> None:

0 commit comments

Comments
 (0)