Skip to content

Commit dc1073a

Browse files
authored
fix: Preserve forefront flag on RequestQueue retry path (#1861)
### Description `RequestQueue._process_batch` accepts a `forefront` flag and forwards it on the first attempt to `add_batch_of_requests`. When the storage client returned unprocessed requests, the recursive retry call omitted `forefront`, so it fell back to the parameter default `False` regardless of what the original caller asked for. A user calling `add_requests(..., forefront=True)` would silently see requests appended to the back of the queue whenever they happened to be in an `unprocessed_requests` response, breaking the priority-ordering guarantee. ### Fix Forward `forefront=forefront` in the recursive `_process_batch` call. ### Test Added a regression test (parametrized over `True` and `False`) that mocks `add_batch_of_requests` to return all requests as unprocessed on the first call and asserts the retry uses the same `forefront` value as the original call.
1 parent 1762e58 commit dc1073a

2 files changed

Lines changed: 58 additions & 2 deletions

File tree

src/crawlee/storages/_request_queue.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,12 @@ async def _process_batch(
348348
unprocessed_requests_unique_keys = {request.unique_key for request in response.unprocessed_requests}
349349
retry_batch = [request for request in batch if request.unique_key in unprocessed_requests_unique_keys]
350350
await asyncio.sleep((base_retry_wait * attempt).total_seconds())
351-
await self._process_batch(retry_batch, base_retry_wait=base_retry_wait, attempt=attempt + 1)
351+
await self._process_batch(
352+
retry_batch,
353+
base_retry_wait=base_retry_wait,
354+
attempt=attempt + 1,
355+
forefront=forefront,
356+
)
352357

353358
request_count = len(batch) - len(response.unprocessed_requests)
354359

tests/unit/storages/test_request_queue.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99
from crawlee import Request, service_locator
1010
from crawlee.configuration import Configuration
1111
from crawlee.storage_clients import MemoryStorageClient, StorageClient
12+
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, UnprocessedRequest
1213
from crawlee.storages import RequestQueue
1314
from crawlee.storages._storage_instance_manager import StorageInstanceManager
1415

1516
if TYPE_CHECKING:
16-
from collections.abc import AsyncGenerator
17+
from collections.abc import AsyncGenerator, Sequence
1718

1819
from crawlee.storage_clients import StorageClient
1920

@@ -259,6 +260,56 @@ async def test_add_requests_with_forefront(rq: RequestQueue) -> None:
259260
assert next_request.url == 'https://example.com/priority'
260261

261262

263+
@pytest.mark.parametrize('forefront', [True, False])
264+
async def test_add_requests_retry_preserves_forefront(
265+
monkeypatch: pytest.MonkeyPatch,
266+
*,
267+
forefront: bool,
268+
) -> None:
269+
"""Regression test: when ``add_batch_of_requests`` returns unprocessed requests, the retry must preserve the
270+
original `forefront` value rather than silently falling back to the parameter default."""
271+
rq = await RequestQueue.open(storage_client=MemoryStorageClient())
272+
forefront_calls: list[bool] = []
273+
274+
async def patched_add_batch(
275+
requests: Sequence[Request],
276+
*,
277+
forefront: bool = False,
278+
) -> AddRequestsResponse:
279+
forefront_calls.append(forefront)
280+
if len(forefront_calls) == 1:
281+
return AddRequestsResponse(
282+
processed_requests=[],
283+
unprocessed_requests=[UnprocessedRequest(unique_key=r.unique_key, url=r.url) for r in requests],
284+
)
285+
return AddRequestsResponse(
286+
processed_requests=[
287+
ProcessedRequest(
288+
unique_key=r.unique_key,
289+
was_already_present=False,
290+
was_already_handled=False,
291+
)
292+
for r in requests
293+
],
294+
unprocessed_requests=[],
295+
)
296+
297+
monkeypatch.setattr(rq._client, 'add_batch_of_requests', patched_add_batch)
298+
299+
try:
300+
await rq.add_requests(
301+
['https://example.com/a', 'https://example.com/b'],
302+
forefront=forefront,
303+
wait_time_between_batches=timedelta(seconds=0),
304+
)
305+
finally:
306+
await rq.drop()
307+
308+
assert forefront_calls == [forefront, forefront], (
309+
f'retry must propagate the original forefront={forefront} flag, got: {forefront_calls}'
310+
)
311+
312+
262313
async def test_add_requests_mixed_forefront(rq: RequestQueue) -> None:
263314
"""Test the ordering when adding requests with mixed forefront values."""
264315
# Add normal requests

0 commit comments

Comments
 (0)