Skip to content

Commit 91757a1

Browse files
committed
feat: 🎸 Added exponential delay to the new queue retries schema
1 parent ba76e51 commit 91757a1

File tree

1 file changed

+59
-6
lines changed

1 file changed

+59
-6
lines changed

‎src/shared/queue-new.ts

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events';
2+
import { simpleUid } from '@windingtree/contracts';
3+
import { backoffWithJitter } from '../utils/time.js';
4+
import { createLogger } from '../utils/logger.js';
5+
6+
const logger = createLogger('Queue');
27

38
/**
49
* Enum to represent the different states a job can be in.
@@ -44,6 +49,8 @@ export interface JobConfig<T extends JobData = JobData> {
4449
maxRetries?: number;
4550
/** Initial retries value */
4651
retries?: number;
52+
/** Retries delay */
53+
retriesDelay?: number;
4754
}
4855

4956
/**
@@ -90,6 +97,8 @@ export class Job<T extends JobData = JobData> {
9097
maxRetries: number;
9198
/** The number of times the job has been retried */
9299
retries: number;
100+
/** The period of time between retries */
101+
retriesDelay: number;
93102
/** The history of the job */
94103
history: JobHistory;
95104

@@ -99,7 +108,7 @@ export class Job<T extends JobData = JobData> {
99108
* @memberof Job
100109
*/
101110
constructor(config: JobConfig<T>) {
102-
this.id = Date.now().toString();
111+
this.id = simpleUid();
103112
this.history = new JobHistory();
104113
this.handlerName = config.handlerName;
105114
this.data = config.data;
@@ -110,6 +119,7 @@ export class Job<T extends JobData = JobData> {
110119
this.maxRecurrences = config.maxRecurrences ?? 0;
111120
this.maxRetries = config.maxRetries ?? 0;
112121
this.retries = config.retries ?? 0;
122+
this.retriesDelay = config.retriesDelay ?? 0;
113123
}
114124

115125
/**
@@ -122,6 +132,7 @@ export class Job<T extends JobData = JobData> {
122132
timestamp: new Date(),
123133
status: newStatus,
124134
});
135+
logger.trace(`Job #${this.id} status changed to: ${this.status}`);
125136
}
126137

127138
/**
@@ -179,6 +190,7 @@ export class Job<T extends JobData = JobData> {
179190
* @memberof Job
180191
*/
181192
async execute(handler: JobHandler<T>) {
193+
logger.trace(`Job #${this.id} executed`);
182194
return Promise.resolve(handler(this.data));
183195
}
184196
}
@@ -329,8 +341,11 @@ export class Queue extends EventEmitter<QueueEvents> {
329341
(job) => job.status === JobStatus.Started,
330342
);
331343
const pendingJobs = this.jobs.filter((job) => job.executable);
344+
logger.trace(`Active jobs: ${activeJobs.length}`);
345+
logger.trace(`Pending jobs: ${pendingJobs.length}`);
332346

333347
const availableSlots = this.concurrencyLimit - activeJobs.length;
348+
logger.trace(`Available slots: ${availableSlots}`);
334349

335350
if (availableSlots <= 0 || pendingJobs.length === 0) {
336351
this.dispatchEvent(new CustomEvent<void>('stop'));
@@ -339,6 +354,9 @@ export class Queue extends EventEmitter<QueueEvents> {
339354

340355
// Get the jobs that will be started now
341356
const jobsToStart = pendingJobs.slice(0, availableSlots);
357+
logger.trace(
358+
`Jobs to start: [${jobsToStart.map((j) => j.id).join(', ')}]`,
359+
);
342360

343361
// Start all the selected jobs concurrently
344362
const promises = jobsToStart.map(async (job) => {
@@ -347,11 +365,13 @@ export class Queue extends EventEmitter<QueueEvents> {
347365

348366
const handler = this.handlers.getHandler(job.handlerName);
349367

350-
const shouldRecur = await job.execute(handler);
368+
const result = await job.execute(handler);
369+
logger.trace(`Job #${job.id} execution result: ${String(result)}`);
351370

352-
if (shouldRecur && job.isRecurrent) {
371+
if (result && job.isRecurrent) {
353372
// If the job is recurrent and the handler returned true, reschedule the job
354373
if (!job.expired) {
374+
logger.trace(`Job #${job.id} is done but new one is scheduled`);
355375
this.changeJobStatus(job, JobStatus.Done);
356376
setTimeout(() => {
357377
this.add({
@@ -366,19 +386,39 @@ export class Queue extends EventEmitter<QueueEvents> {
366386
});
367387
}, job.recurrenceInterval);
368388
} else {
389+
logger.trace(`Job #${job.id} is expired`);
369390
this.changeJobStatus(job, JobStatus.Expired);
370391
}
371392
} else {
393+
logger.trace(`Job #${job.id} is done`);
372394
this.changeJobStatus(job, JobStatus.Done);
373395
}
374396
} catch (error) {
397+
logger.error(`Job #${job.id} is errored`, error);
375398
job.history.errors.push(error as Error);
376399

377400
if (job.maxRetries > 0 && job.retries < job.maxRetries) {
378401
// If the job hasn't reached the maximum number of retries, retry it
379402
job.retries++;
380-
this.changeJobStatus(job, JobStatus.Pending);
403+
404+
if (job.retriesDelay > 0) {
405+
logger.trace(`Job #${job.id} filed but scheduled for restart`);
406+
this.changeJobStatus(job, JobStatus.Failed);
407+
setTimeout(() => {
408+
this.add({
409+
handlerName: job.handlerName,
410+
data: job.data,
411+
expire: job.expire,
412+
maxRetries: job.maxRetries,
413+
retries: job.retries + 1,
414+
});
415+
}, backoffWithJitter(job.retriesDelay, job.retries, job.retriesDelay * 10));
416+
} else {
417+
logger.trace(`Job #${job.id} failed and immediately restarted`);
418+
this.changeJobStatus(job, JobStatus.Pending);
419+
}
381420
} else {
421+
logger.trace(`Job #${job.id} filed`);
382422
this.changeJobStatus(job, JobStatus.Failed);
383423
}
384424
}
@@ -387,9 +427,10 @@ export class Queue extends EventEmitter<QueueEvents> {
387427
await Promise.allSettled(promises);
388428

389429
// After these jobs are done, check if there are any more jobs to process
430+
logger.trace('Trying to restart queue');
390431
void this.start();
391432
} catch (error) {
392-
console.error(error);
433+
logger.error('Queue start failed', error);
393434
}
394435
}
395436

@@ -415,6 +456,7 @@ export class Queue extends EventEmitter<QueueEvents> {
415456
add<T extends JobData = JobData>(config: JobConfig<T>): string {
416457
const job = new Job<T>(config);
417458
this.jobs.push(job);
459+
logger.trace('Job added:', job);
418460
void this.start();
419461
return job.id;
420462
}
@@ -441,10 +483,13 @@ export class Queue extends EventEmitter<QueueEvents> {
441483
const job = this.jobs.find((job) => job.id === id);
442484

443485
if (job) {
486+
logger.trace(`Job #${id} is cancelled`);
444487
job.status = JobStatus.Cancelled;
445488
return true;
446489
}
447490

491+
logger.trace(`Job #${id} has not been cancelled`);
492+
448493
return false;
449494
}
450495

@@ -458,6 +503,14 @@ export class Queue extends EventEmitter<QueueEvents> {
458503
delete(id: string): boolean {
459504
const size = this.jobs.length;
460505
this.jobs = this.jobs.filter((job) => job.id !== id);
461-
return this.jobs.length < size;
506+
const isDeleted = this.jobs.length < size;
507+
508+
if (isDeleted) {
509+
logger.trace(`Job #${id} is deleted`);
510+
} else {
511+
logger.trace(`Job #${id} has not been deleted`);
512+
}
513+
514+
return isDeleted;
462515
}
463516
}

0 commit comments

Comments
 (0)