Skip to content

Commit cd15dce

Browse files
authored
fix: Fall back to drop+recreate when RequestQueue.purge is unsupported (#1883)
## Summary Some storage clients do not support purging the request queue — most notably the Apify platform client, which raises `NotImplementedError`. Since `BasicCrawler` started calling `request_manager.purge()` automatically on repeated runs (PR #1762), this surfaced as a regression on the Apify platform: a previously working pattern now blew up. `RequestQueue.purge()` now catches `NotImplementedError`, logs a warning, drops the underlying queue, and re-opens it via `RequestQueue.open()`, swapping the new client/id into the existing instance so callers like `BasicCrawler` keep working transparently. This restores the pre-#1762 behavior on the platform (drop the default queue, let the platform recreate it) without exposing the fallback to callers. The new `test_purge_falls_back_to_drop_when_not_implemented` covers the path against all four storage backends (memory, file_system, sql, redis).
1 parent 932a3c6 commit cd15dce

2 files changed

Lines changed: 110 additions & 1 deletion

File tree

src/crawlee/storages/_request_queue.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,28 @@ async def drop(self) -> None:
150150

151151
@override
152152
async def purge(self) -> None:
153-
await self._client.purge()
153+
try:
154+
await self._client.purge()
155+
except NotImplementedError:
156+
client_name = type(self._client).__name__
157+
if self._name is not None:
158+
logger.warning(
159+
f'Storage client "{client_name}" does not support purging the request queue. '
160+
f'Skipping purge for named queue "{self._name}" to avoid destroying persistent data; '
161+
'the queue contents are left intact.'
162+
)
163+
return
164+
logger.warning(
165+
f'Storage client "{client_name}" does not support purging the request queue. '
166+
'Falling back to dropping and recreating the unnamed queue; the request queue ID may change.'
167+
)
168+
await self.drop()
169+
# Override `purge_on_start` so the storage client does not try to purge the freshly recreated
170+
# (and necessarily empty) queue and re-raise the same `NotImplementedError`.
171+
recreate_config = service_locator.get_configuration().model_copy(update={'purge_on_start': False})
172+
new_rq = await RequestQueue.open(configuration=recreate_config)
173+
self._client = new_rq._client # noqa: SLF001
174+
self._id = new_rq._id # noqa: SLF001
154175

155176
@override
156177
async def add_request(

tests/unit/storages/test_request_queue.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,94 @@ async def test_purge(
715715
await rq.drop()
716716

717717

718+
async def test_purge_falls_back_to_drop_for_unnamed_queue_when_not_implemented(
719+
storage_client: StorageClient,
720+
caplog: pytest.LogCaptureFixture,
721+
monkeypatch: pytest.MonkeyPatch,
722+
) -> None:
723+
"""Test that `purge` falls back to drop+recreate for unnamed queues when the client raises `NotImplementedError`.
724+
725+
Some storage clients (e.g. the Apify platform client) do not support purging. For the default unnamed queue
726+
used by `BasicCrawler`, `purge` should drop and recreate the queue so that callers keep working on repeated
727+
runs. Named queues are handled separately to avoid destroying persistent data.
728+
"""
729+
rq = await RequestQueue.open(storage_client=storage_client)
730+
assert rq.name is None
731+
732+
await rq.add_requests(['https://example.com/1', 'https://example.com/2'])
733+
metadata = await rq.get_metadata()
734+
assert metadata.pending_request_count == 2
735+
736+
async def _raise_not_implemented(self: object) -> None:
737+
raise NotImplementedError('Purge is not supported.')
738+
739+
monkeypatch.setattr(type(rq._client), 'purge', _raise_not_implemented)
740+
741+
with caplog.at_level('WARNING'):
742+
await rq.purge()
743+
744+
assert any(
745+
'does not support purging' in rec.message and 'dropping and recreating' in rec.message for rec in caplog.records
746+
)
747+
748+
# The queue should be empty, usable, and backed by a fresh client (id may differ for backends that mint new ids).
749+
metadata = await rq.get_metadata()
750+
assert metadata.pending_request_count == 0
751+
assert metadata.total_request_count == 0
752+
assert metadata.handled_request_count == 0
753+
assert rq.id is not None
754+
755+
await rq.add_request('https://example.com/after-purge')
756+
request = await rq.fetch_next_request()
757+
assert request is not None
758+
assert request.url == 'https://example.com/after-purge'
759+
760+
await rq.drop()
761+
762+
763+
async def test_purge_skips_named_queue_when_not_implemented(
764+
storage_client: StorageClient,
765+
caplog: pytest.LogCaptureFixture,
766+
monkeypatch: pytest.MonkeyPatch,
767+
) -> None:
768+
"""Test that `purge` is a logged no-op for named queues when the client raises `NotImplementedError`.
769+
770+
Named queues are considered persistent (e.g. shared across runs on the Apify platform), so falling back
771+
to drop+recreate would silently destroy user data. Instead `purge` logs a warning and leaves the queue
772+
intact.
773+
"""
774+
rq = await RequestQueue.open(
775+
name='purge-fallback-named-test',
776+
storage_client=storage_client,
777+
)
778+
original_id = rq.id
779+
780+
await rq.add_requests(['https://example.com/1', 'https://example.com/2'])
781+
metadata = await rq.get_metadata()
782+
assert metadata.pending_request_count == 2
783+
784+
async def _raise_not_implemented(self: object) -> None:
785+
raise NotImplementedError('Purge is not supported.')
786+
787+
monkeypatch.setattr(type(rq._client), 'purge', _raise_not_implemented)
788+
789+
with caplog.at_level('WARNING'):
790+
await rq.purge()
791+
792+
assert any(
793+
'does not support purging' in rec.message and 'Skipping purge for named queue' in rec.message
794+
for rec in caplog.records
795+
)
796+
797+
# Queue identity and contents must be preserved.
798+
assert rq.id == original_id
799+
metadata = await rq.get_metadata()
800+
assert metadata.pending_request_count == 2
801+
assert metadata.total_request_count == 2
802+
803+
await rq.drop()
804+
805+
718806
async def test_open_with_alias(
719807
storage_client: StorageClient,
720808
) -> None:

0 commit comments

Comments
 (0)