Skip to content

Commit 36dc180

Browse files
committed
feat: 🎸 Added storage support to the new queue
1 parent 4a39046 commit 36dc180

File tree

1 file changed

+253
-28
lines changed

1 file changed

+253
-28
lines changed

‎src/shared/queue-new.ts

Lines changed: 253 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events';
22
import { simpleUid } from '@windingtree/contracts';
3+
import { Storage } from '../storage/index.js';
34
import { backoffWithJitter } from '../utils/time.js';
45
import { createLogger } from '../utils/logger.js';
56

@@ -29,6 +30,54 @@ export interface JobHandler<T extends JobData = JobData> {
2930
(data?: T): Promise<boolean>;
3031
}
3132

33+
export interface JobHistoryInterface {
34+
/** A history of all status changes for the job. */
35+
statusChanges?: { timestamp: number; status: JobStatus }[];
36+
/** A history of all errors for the job. */
37+
errors?: string[];
38+
}
39+
40+
/**
41+
* A class to manage the history of a job. This includes status changes and errors.
42+
*
43+
* @export
44+
* @class JobHistory
45+
*/
46+
export class JobHistory implements JobHistoryInterface {
47+
/** A history of all status changes for the job. */
48+
statusChanges: { timestamp: number; status: JobStatus }[];
49+
/** A history of all errors for the job. */
50+
errors: string[];
51+
52+
/**
53+
* Creates an instance of JobHistory.
54+
* @memberof JobHistory
55+
*/
56+
constructor(config: JobHistoryInterface) {
57+
this.statusChanges = config.statusChanges ?? [];
58+
this.errors = config.errors ?? [];
59+
}
60+
61+
static getStatus(source: JobHistory | JobHistoryInterface) {
62+
return source.statusChanges && source.statusChanges.length > 0
63+
? source.statusChanges[source.statusChanges.length - 1].status
64+
: JobStatus.Pending;
65+
}
66+
67+
/**
68+
* Returns class as object
69+
*
70+
* @returns
71+
* @memberof JobHistory
72+
*/
73+
toJSON(): JobHistoryInterface {
74+
return {
75+
statusChanges: this.statusChanges,
76+
errors: this.errors,
77+
};
78+
}
79+
}
80+
3281
/**
3382
* Configuration object for a job.
3483
*/
@@ -51,24 +100,8 @@ export interface JobConfig<T extends JobData = JobData> {
51100
retries?: number;
52101
/** Retries delay */
53102
retriesDelay?: number;
54-
}
55-
56-
/**
57-
* A class to manage the history of a job. This includes status changes and errors.
58-
*
59-
* @export
60-
* @class JobHistory
61-
*/
62-
export class JobHistory {
63-
/** A history of all status changes for the job. */
64-
statusChanges: { timestamp: Date; status: JobStatus }[];
65-
/** A history of all errors for the job. */
66-
errors: Error[];
67-
68-
constructor() {
69-
this.statusChanges = [];
70-
this.errors = [];
71-
}
103+
/** The history of the job */
104+
history?: JobHistoryInterface;
72105
}
73106

74107
/**
@@ -109,17 +142,16 @@ export class Job<T extends JobData = JobData> {
109142
*/
110143
constructor(config: JobConfig<T>) {
111144
this.id = simpleUid();
112-
this.history = new JobHistory();
113145
this.handlerName = config.handlerName;
114146
this.data = config.data;
115147
this.expire = config.expire;
116-
this.status = JobStatus.Pending;
117148
this.isRecurrent = config.isRecurrent ?? false;
118149
this.recurrenceInterval = config.recurrenceInterval ?? 0;
119150
this.maxRecurrences = config.maxRecurrences ?? 0;
120151
this.maxRetries = config.maxRetries ?? 0;
121152
this.retries = config.retries ?? 0;
122153
this.retriesDelay = config.retriesDelay ?? 0;
154+
this.history = new JobHistory(config.history ?? {});
123155
}
124156

125157
/**
@@ -129,7 +161,7 @@ export class Job<T extends JobData = JobData> {
129161
*/
130162
set status(newStatus: JobStatus) {
131163
this.history.statusChanges.push({
132-
timestamp: new Date(),
164+
timestamp: Date.now(),
133165
status: newStatus,
134166
});
135167
logger.trace(`Job #${this.id} status changed to: ${this.status}`);
@@ -141,8 +173,7 @@ export class Job<T extends JobData = JobData> {
141173
* @memberof Job
142174
*/
143175
get status() {
144-
return this.history.statusChanges[this.history.statusChanges.length - 1]
145-
.status;
176+
return JobHistory.getStatus(this.history);
146177
}
147178

148179
/**
@@ -182,6 +213,27 @@ export class Job<T extends JobData = JobData> {
182213
);
183214
}
184215

216+
/**
217+
* Returns Job as config object
218+
*
219+
* @returns {JobConfig<T>}
220+
* @memberof Job
221+
*/
222+
toJSON(): JobConfig<T> {
223+
return {
224+
handlerName: this.handlerName,
225+
data: this.data,
226+
expire: this.expire,
227+
isRecurrent: this.isRecurrent,
228+
recurrenceInterval: this.recurrenceInterval,
229+
maxRecurrences: this.maxRecurrences,
230+
maxRetries: this.maxRetries,
231+
retries: this.retries,
232+
retriesDelay: this.retriesDelay,
233+
history: this.history.toJSON(),
234+
};
235+
}
236+
185237
/**
186238
* Executes the job using the provided handler.
187239
*
@@ -249,6 +301,11 @@ export class JobHandlerRegistry {
249301
* @interface QueueOptions
250302
*/
251303
export interface QueueOptions {
304+
/** Queue storage object */
305+
storage?: Storage;
306+
/** Name of the key that is used for storing jobs Ids */
307+
idsKeyName?: string;
308+
/** The maximum number of jobs that can be concurrently active. */
252309
concurrencyLimit?: number;
253310
}
254311

@@ -288,6 +345,10 @@ export interface QueueEvents<T extends JobData = JobData> {
288345
* @extends {EventEmitter<QueueEvents>}
289346
*/
290347
export class Queue extends EventEmitter<QueueEvents> {
348+
/** Queue storage object */
349+
storage?: Storage;
350+
/** Name of the key that is used for storing jobs Ids */
351+
idsKeyName: string;
291352
/** The maximum number of jobs that can be concurrently active. */
292353
concurrencyLimit: number;
293354
/** The list of all jobs in the queue. */
@@ -301,11 +362,134 @@ export class Queue extends EventEmitter<QueueEvents> {
301362
* @param {QueueOptions} { concurrencyLimit }
302363
* @memberof Queue
303364
*/
304-
constructor({ concurrencyLimit }: QueueOptions) {
365+
constructor({ storage, idsKeyName, concurrencyLimit }: QueueOptions) {
305366
super();
367+
(this.storage = storage), (this.idsKeyName = idsKeyName ?? 'jobsIds');
306368
this.concurrencyLimit = concurrencyLimit ?? 5;
307369
this.jobs = [];
308370
this.handlers = new JobHandlerRegistry();
371+
void this.storageUp();
372+
}
373+
374+
/**
375+
* Restores saved jobs from the storage
376+
*
377+
* @protected
378+
* @returns
379+
* @memberof Queue
380+
*/
381+
protected async storageUp() {
382+
try {
383+
// Ignore storage features if not set up
384+
if (!this.storage) {
385+
return;
386+
}
387+
388+
const jobsIds = await this.storage.get<string[]>(this.idsKeyName);
389+
390+
if (jobsIds) {
391+
for (const id of jobsIds) {
392+
try {
393+
const jobConfig = await this.storage.get<JobConfig>(id);
394+
395+
if (!jobConfig) {
396+
throw new Error(`Unable to get job #${id} from storage`);
397+
}
398+
399+
// Only Pending jobs must be restored
400+
if (
401+
jobConfig.history &&
402+
JobHistory.getStatus(jobConfig.history) === JobStatus.Pending
403+
) {
404+
this.add(jobConfig);
405+
}
406+
} catch (error) {
407+
logger.error(error);
408+
}
409+
}
410+
} else {
411+
logger.trace('Jobs Ids not found in the storage');
412+
}
413+
} catch (error) {
414+
logger.error('storageUp error:', error);
415+
}
416+
}
417+
418+
/**
419+
* Stores all pending jobs to the storage
420+
*
421+
* @protected
422+
* @returns
423+
* @memberof Queue
424+
*/
425+
protected async storageDown() {
426+
try {
427+
// Ignore storage features if not set up
428+
if (!this.storage) {
429+
return;
430+
}
431+
432+
const pendingJobs = this.jobs.filter((job) => job.executable);
433+
434+
const { ids, configs } = pendingJobs.reduce<{
435+
ids: string[];
436+
configs: JobConfig[];
437+
}>(
438+
(a, v) => {
439+
a.ids.push(v.id);
440+
a.configs.push(v.toJSON());
441+
return a;
442+
},
443+
{
444+
ids: [],
445+
configs: [],
446+
},
447+
);
448+
449+
const jobsIds = new Set(
450+
(await this.storage.get<string[]>(this.idsKeyName)) ?? [],
451+
);
452+
453+
for (let i = 0; i < ids.length; i++) {
454+
try {
455+
jobsIds.add(ids[i]);
456+
await this.storage.set(ids[i], configs[i]);
457+
} catch (error) {
458+
logger.error(`Job #${ids[i]} save error:`, error);
459+
}
460+
}
461+
462+
await this.storage.set(this.idsKeyName, Array.from(jobsIds));
463+
} catch (error) {
464+
logger.error('storageDown error:', error);
465+
}
466+
}
467+
468+
/**
469+
* Updated saved job on storage
470+
*
471+
* @protected
472+
* @param {string} id
473+
* @param {Job} job
474+
* @returns
475+
* @memberof Queue
476+
*/
477+
protected async storageUpdate(id: string, job: Job) {
478+
try {
479+
// Ignore storage features if not set up
480+
if (!this.storage) {
481+
return;
482+
}
483+
484+
const jobsIds = new Set(
485+
(await this.storage.get<string[]>(this.idsKeyName)) ?? [],
486+
);
487+
jobsIds.add(id);
488+
await this.storage.set(id, job.toJSON());
489+
await this.storage.set(this.idsKeyName, Array.from(jobsIds));
490+
} catch (error) {
491+
logger.error('storageDown error:', error);
492+
}
309493
}
310494

311495
/**
@@ -323,6 +507,7 @@ export class Queue extends EventEmitter<QueueEvents> {
323507
detail: job,
324508
}),
325509
);
510+
void this.storageUpdate(job.id, job);
326511
}
327512

328513
/**
@@ -395,7 +580,7 @@ export class Queue extends EventEmitter<QueueEvents> {
395580
}
396581
} catch (error) {
397582
logger.error(`Job #${job.id} is errored`, error);
398-
job.history.errors.push(error as Error);
583+
job.history.errors.push(String(error));
399584

400585
if (job.maxRetries > 0 && job.retries < job.maxRetries) {
401586
// If the job hasn't reached the maximum number of retries, retry it
@@ -457,19 +642,50 @@ export class Queue extends EventEmitter<QueueEvents> {
457642
const job = new Job<T>(config);
458643
this.jobs.push(job);
459644
logger.trace('Job added:', job);
645+
void this.storageUpdate(job.id, job);
460646
void this.start();
461647
return job.id;
462648
}
463649

464650
/**
465-
* Returns a job from the queue by its ID.
651+
* Returns a job from the queue by its ID. Uses local in-memory source
466652
*
467653
* @param {string} id
468654
* @returns {(Job | undefined)} The job if found, otherwise undefined.
469655
* @memberof Queue
470656
*/
471-
get(id: string): Job | undefined {
472-
return this.jobs.find((job) => job.id === id);
657+
getLocal<T extends JobData = JobData>(id: string): Job<T> | undefined {
658+
const localJob = this.jobs.find((job) => job.id === id) as Job<T>;
659+
660+
if (localJob) {
661+
return localJob;
662+
}
663+
664+
return;
665+
}
666+
667+
/**
668+
* Returns a job config from the queue by its ID. Uses both local and storage search
669+
*
670+
* @param {string} id
671+
* @returns {Promise<JobConfig | undefined>} The job if found, otherwise undefined.
672+
* @memberof Queue
673+
*/
674+
async get<T extends JobData = JobData>(
675+
id: string,
676+
): Promise<JobConfig<T> | undefined> {
677+
const localJob = this.getLocal<T>(id);
678+
679+
if (localJob) {
680+
return localJob.toJSON();
681+
}
682+
683+
if (!this.storage) {
684+
return;
685+
}
686+
687+
// If job not found locally we will try to find on storage
688+
return await this.storage.get<JobConfig<T>>(id);
473689
}
474690

475691
/**
@@ -513,4 +729,13 @@ export class Queue extends EventEmitter<QueueEvents> {
513729

514730
return isDeleted;
515731
}
732+
733+
/**
734+
* Graceful queue stop
735+
*
736+
* @memberof Queue
737+
*/
738+
async stop() {
739+
await this.storageDown();
740+
}
516741
}

0 commit comments

Comments
 (0)