Skip to content

Commit ca48726

Browse files
authored
Merge pull request #25 from windingtree/refactor/queue
refactor: 💡 `then`-able parts of Queue rewritten to async/await
2 parents 4b2fe55 + d57e221 commit ca48726

File tree

1 file changed

+117
-95
lines changed

1 file changed

+117
-95
lines changed

src/shared/queue.ts

Lines changed: 117 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events';
2+
import { stringify } from 'viem';
23
import { simpleUid } from '@windingtree/contracts';
34
import { Storage } from '../storage/index.js';
45
import {
@@ -206,6 +207,7 @@ export interface QueueEvents {
206207
* @extends {EventEmitter<QueueEvents>}
207208
*/
208209
export class Queue extends EventEmitter<QueueEvents> {
210+
/** External job storage */
209211
private storage: Storage;
210212
private hashKey: string;
211213
/** All jobs in queue */
@@ -241,7 +243,7 @@ export class Queue extends EventEmitter<QueueEvents> {
241243
this.liveJobs = new Set<string>();
242244
this.jobHandlers = new Map<string, JobHandlerClosure>();
243245
this.processing = false;
244-
this._init().catch(logger.error);
246+
void this._init();
245247
logger.trace('Queue instantiated');
246248
}
247249

@@ -250,30 +252,35 @@ export class Queue extends EventEmitter<QueueEvents> {
250252
*
251253
* @returns {Promise<void>}
252254
*/
253-
private async _init(): Promise<void> {
255+
protected async _init(): Promise<void> {
254256
if (this.heartbeatInterval) {
255257
return;
256258
}
257259

258-
const rawJobKeys = await this.storage.get<string>(this.hashKey);
259-
260-
if (rawJobKeys) {
261-
new Set<string>(JSON.parse(rawJobKeys) as string[]);
262-
}
260+
try {
261+
// Read stored serialized array of jobs Ids from an external storage
262+
const rawJobKeys = await this.storage.get<string>(this.hashKey);
263263

264-
/** Heartbeat callback */
265-
const tick = () => {
266-
if (this.jobs.size > 0) {
267-
this._process().catch(logger.error);
268-
return;
264+
if (rawJobKeys) {
265+
new Set<string>(JSON.parse(rawJobKeys) as string[]);
269266
}
270267

271-
clearInterval(this.heartbeatInterval);
272-
this.heartbeatInterval = undefined;
273-
logger.trace('Queue interval cleared');
274-
};
268+
/** Heartbeat callback */
269+
const tick = () => {
270+
if (this.jobs.size > 0) {
271+
void this._process();
272+
return;
273+
}
274+
275+
clearInterval(this.heartbeatInterval);
276+
this.heartbeatInterval = undefined;
277+
logger.trace('Queue interval cleared');
278+
};
275279

276-
this.heartbeatInterval = setInterval(tick.bind(this), this.heartbeat);
280+
this.heartbeatInterval = setInterval(tick.bind(this), this.heartbeat);
281+
} catch (error) {
282+
logger.error('_init', error);
283+
}
277284
}
278285

279286
/**
@@ -282,10 +289,40 @@ export class Queue extends EventEmitter<QueueEvents> {
282289
* @returns {Promise<void>}
283290
*/
284291
private async _sync(): Promise<void> {
285-
await this.storage.set(this.hashKey, JSON.stringify(Array.from(this.jobs)));
292+
await this.storage.set(this.hashKey, stringify(Array.from(this.jobs)));
286293
logger.trace('Storage synced');
287294
}
288295

296+
/**
297+
* Saves a new job to storage
298+
*
299+
* @private
300+
* @param {string} jobId
301+
* @param {Job} job
302+
* @returns {Promise<void>}
303+
* @memberof Queue
304+
*/
305+
private async _saveJob(jobId: string, job: Job): Promise<void> {
306+
try {
307+
await this.storage.set<Job>(jobId, job);
308+
309+
this.jobs.add(jobId);
310+
logger.trace(`Added job #${jobId}`);
311+
312+
this.dispatchEvent(
313+
new CustomEvent('job', {
314+
detail: job,
315+
}),
316+
);
317+
logger.trace(`Added job #${jobId}`, job);
318+
319+
await this._sync();
320+
await this._init();
321+
} catch (error) {
322+
logger.error('_saveJob', error);
323+
}
324+
}
325+
289326
/**
290327
* Picks a certain amount of jobs to run
291328
*
@@ -452,62 +489,68 @@ export class Queue extends EventEmitter<QueueEvents> {
452489
}
453490

454491
this.liveJobs.delete(job.id);
492+
493+
logger.trace(`Job #${job.id} fulfilled`);
455494
} catch (error) {
456495
logger.error(`Job #${job.id} error:`, error);
457496

458-
job = await this._updatedJobState(job, {
459-
status: JobStatus.ERRORED,
460-
errors: [
461-
...(job.state.errors ?? []),
462-
{
463-
time: Date.now(),
464-
error: (error as Error).stack || (error as Error).message,
465-
},
466-
],
467-
});
468-
logger.trace(`Job #${job.id} errored`, job);
497+
try {
498+
job = await this._updatedJobState(job, {
499+
status: JobStatus.ERRORED,
500+
errors: [
501+
...(job.state.errors ?? []),
502+
{
503+
time: Date.now(),
504+
error: (error as Error).stack || (error as Error).message,
505+
},
506+
],
507+
});
508+
logger.trace(`Job #${job.id} errored`, job);
469509

470-
this.dispatchEvent(
471-
new CustomEvent<Job>('error', {
472-
detail: job,
473-
}),
474-
);
510+
this.dispatchEvent(
511+
new CustomEvent<Job>('error', {
512+
detail: job,
513+
}),
514+
);
515+
516+
if (
517+
job.options.attempts &&
518+
job.options.attempts > 0 &&
519+
job.state.attempts < job.options.attempts
520+
) {
521+
job = await this._updatedJobState(job, {
522+
attempts: job.state.attempts + 1,
523+
scheduled: Date.now() + job.options.attemptsDelay,
524+
});
525+
logger.trace(`Errored job #${job.id} scheduled`, job);
526+
527+
this.liveJobs.delete(job.id);
528+
529+
this.dispatchEvent(
530+
new CustomEvent<Job>('scheduled', {
531+
detail: job,
532+
}),
533+
);
534+
return;
535+
}
475536

476-
if (
477-
job.options.attempts &&
478-
job.options.attempts > 0 &&
479-
job.state.attempts < job.options.attempts
480-
) {
481537
job = await this._updatedJobState(job, {
482-
attempts: job.state.attempts + 1,
483-
scheduled: Date.now() + job.options.attemptsDelay,
538+
status: JobStatus.FAILED,
484539
});
485-
logger.trace(`Errored job #${job.id} scheduled`, job);
486540

541+
this.jobs.delete(job.id);
542+
await this._sync();
487543
this.liveJobs.delete(job.id);
544+
logger.trace(`Job #${job.id} failed`, job);
488545

489546
this.dispatchEvent(
490-
new CustomEvent<Job>('scheduled', {
547+
new CustomEvent<Job>('fail', {
491548
detail: job,
492549
}),
493550
);
494-
return;
551+
} catch (error) {
552+
logger.error(error);
495553
}
496-
497-
job = await this._updatedJobState(job, {
498-
status: JobStatus.FAILED,
499-
});
500-
501-
this.jobs.delete(job.id);
502-
await this._sync();
503-
this.liveJobs.delete(job.id);
504-
logger.trace(`Job #${job.id} failed`, job);
505-
506-
this.dispatchEvent(
507-
new CustomEvent<Job>('fail', {
508-
detail: job,
509-
}),
510-
);
511554
}
512555
}
513556

@@ -521,22 +564,20 @@ export class Queue extends EventEmitter<QueueEvents> {
521564
return;
522565
}
523566

524-
this.processing = true;
525-
const jobs = await this._pickJobs();
567+
try {
568+
this.processing = true;
569+
const jobs = await this._pickJobs();
526570

527-
if (jobs.length > 0) {
528-
logger.trace(`Picked #${jobs.length} jobs`);
571+
if (jobs.length > 0) {
572+
logger.trace(`Picked #${jobs.length} jobs`);
529573

530-
await Promise.allSettled(
531-
jobs.map((job) =>
532-
this._doJob(job)
533-
.then(() => logger.trace(`Job #${job.id} fulfilled`))
534-
.catch((error) => logger.error(`Job #${job.id} error`, error)),
535-
),
536-
);
537-
}
574+
await Promise.allSettled(jobs.map((job) => this._doJob(job)));
575+
}
538576

539-
this.processing = false;
577+
this.processing = false;
578+
} catch (error) {
579+
logger.error(error);
580+
}
540581
}
541582

542583
/**
@@ -586,9 +627,8 @@ export class Queue extends EventEmitter<QueueEvents> {
586627
throw new Error(`Job handler with name #${name} not registered yet`);
587628
}
588629

589-
const jobId = simpleUid();
590630
const job: Job<JobDataType> = {
591-
id: jobId,
631+
id: simpleUid(),
592632
name,
593633
data,
594634
options: {
@@ -603,25 +643,7 @@ export class Queue extends EventEmitter<QueueEvents> {
603643
},
604644
};
605645

606-
// @todo Validate job data
607-
608-
this.storage
609-
.set<Job>(jobId, job)
610-
.then(() => {
611-
this.jobs.add(jobId);
612-
logger.trace(`Added job Id #${jobId}`);
613-
})
614-
.then(() => {
615-
logger.trace(`Added job #${jobId}`, job);
616-
this.dispatchEvent(
617-
new CustomEvent('job', {
618-
detail: job,
619-
}),
620-
);
621-
})
622-
.then(() => this._sync())
623-
.then(() => this._init())
624-
.catch(logger.error);
646+
void this._saveJob(job.id, job);
625647

626648
return job;
627649
}
@@ -636,7 +658,7 @@ export class Queue extends EventEmitter<QueueEvents> {
636658
const job = await this.storage.get<Job>(id);
637659

638660
if (!job) {
639-
throw new Error(`Job $${id} not found`);
661+
throw new Error(`Job #${id} not found`);
640662
}
641663

642664
return job;
@@ -648,7 +670,7 @@ export class Queue extends EventEmitter<QueueEvents> {
648670
* @param {string} id Job id
649671
* @returns {Promise<void>}
650672
*/
651-
async cancelJob(id: string) {
673+
async cancelJob(id: string): Promise<void> {
652674
if (!this.jobs.has(id)) {
653675
throw new Error(`Job #${id} not in the queue`);
654676
}

0 commit comments

Comments
 (0)