Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add exponential backoff to batchAddRequests, untie batch and client settings #243

Merged
merged 8 commits into from
May 12, 2022
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
2.4.0 / 2022/05/11
- Add exponential backoff to `batchAddRequests`
- Add option `minDelayBetweenUnprocessedRequestsRetriesMillis` to `batchAddRequests`
2.3.1 / 2022/04/28
===================
- Fix: Retries in batch requests insert endpoint
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "apify-client",
"version": "2.3.1",
"version": "2.4.0",
"description": "Apify API client for JavaScript",
"main": "dist/index.js",
"module": "dist/index.mjs",
Expand Down
43 changes: 26 additions & 17 deletions src/resource_clients/request_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
const MAX_REQUESTS_PER_BATCH_OPERATION = 25;
// The number of 50 parallel requests seemed optimal, if it was higher it did not seem to bring any extra value.
const DEFAULT_PARALLEL_BATCH_ADD_REQUESTS = 50;
const DEFAULT_MIN_DELAY_BETWEEN_UNPROCESSED_REQUESTS_RETRIES_MILLIS = 500;

/**
* @hideconstructor
Expand Down Expand Up @@ -145,8 +146,9 @@ export class RequestQueueClient extends ResourceClient {
options: RequestQueueClientBatchAddRequestWithRetriesOptions = {},
): Promise<RequestQueueClientBatchRequestsOperationResult> {
const {
maxUnprocessedRequestsRetries = this.httpClient.maxRetries,
forefront,
maxUnprocessedRequestsRetries = 0,
minDelayBetweenUnprocessedRequestsRetriesMillis = DEFAULT_MIN_DELAY_BETWEEN_UNPROCESSED_REQUESTS_RETRIES_MILLIS,
} = options;
// Keep track of the requests that remain to be processed (in parameter format)
let remainingRequests = requests;
Expand Down Expand Up @@ -190,9 +192,9 @@ export class RequestQueueClient extends ResourceClient {
break;
}

// Sleep for some time before trying again
// TODO: Do not use delay from client, but rather a separate value
await new Promise((resolve) => setTimeout(resolve, this.httpClient.minDelayBetweenRetriesMillis));
// Exponential backoff
const delayMillis = Math.floor((1 + Math.random()) * (2 ** i) * minDelayBetweenUnprocessedRequestsRetriesMillis);
await new Promise((resolve) => setTimeout(resolve, delayMillis));
}

const result = { processedRequests, unprocessedRequests } as unknown as JsonObject;
Expand All @@ -213,30 +215,36 @@ export class RequestQueueClient extends ResourceClient {
): Promise<RequestQueueClientBatchRequestsOperationResult> {
const {
forefront,
maxUnprocessedRequestsRetries = this.httpClient.maxRetries,
maxUnprocessedRequestsRetries = 0,
maxParallel = DEFAULT_PARALLEL_BATCH_ADD_REQUESTS,
minDelayBetweenUnprocessedRequestsRetriesMillis = DEFAULT_MIN_DELAY_BETWEEN_UNPROCESSED_REQUESTS_RETRIES_MILLIS,
} = options;
ow(requests, ow.array.ofType(ow.object.partialShape({
id: ow.undefined,
})).minLength(1));
ow(forefront, ow.optional.boolean);
ow(maxUnprocessedRequestsRetries, ow.optional.number);
ow(maxParallel, ow.optional.number);

const operationsInProgress = [];
const individualResults = [];

// Send up to `maxParallelRequests` requests at once, wait for all of them to finish and repeat
for (let i = 0; i < requests.length; i += 25) {
const requestsInBatch = requests.slice(i, i + 25);
operationsInProgress.push(this._batchAddRequestsWithRetries(requestsInBatch, options));
if (operationsInProgress.length === maxParallel) {
individualResults.push(...(await Promise.all(operationsInProgress)));
operationsInProgress.splice(0, operationsInProgress.length);
ow(minDelayBetweenUnprocessedRequestsRetriesMillis, ow.optional.number);

const executingRequests = new Set();
const individualResults: RequestQueueClientBatchRequestsOperationResult[] = [];

// Keep a pool of up to `maxParallel` requests running at once
for (let i = 0; i < requests.length; i += MAX_REQUESTS_PER_BATCH_OPERATION) {
const requestsInBatch = requests.slice(i, i + MAX_REQUESTS_PER_BATCH_OPERATION);
const requestPromise = this._batchAddRequestsWithRetries(requestsInBatch, options);
executingRequests.add(requestPromise);
requestPromise.then((batchAddResult) => {
executingRequests.delete(requestPromise);
individualResults.push(batchAddResult);
});
if (executingRequests.size >= maxParallel) {
await Promise.race(executingRequests);
}
}
// Get results from remaining operations
individualResults.push(...(await Promise.all(operationsInProgress)));
await Promise.all(executingRequests);

// Combine individual results together
const result: RequestQueueClientBatchRequestsOperationResult = {
Expand Down Expand Up @@ -403,6 +411,7 @@ export interface RequestQueueClientBatchAddRequestWithRetriesOptions {
forefront?: boolean;
maxUnprocessedRequestsRetries?: number;
maxParallel?: number;
minDelayBetweenUnprocessedRequestsRetriesMillis?: number;
}

export interface RequestQueueClientRequestSchema {
Expand Down