Skip to content

Commit

Permalink
feat(job): extend getDependencies to support pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jun 4, 2021
1 parent d3335ce commit 9b61bbb
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 16 deletions.
113 changes: 98 additions & 15 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ export interface MoveToChildrenOpts {
};
}

interface DependenciesOpts {
processed?: {
cursor?: number;
count?: number;
};
unprocessed?: {
cursor?: number;
count?: number;
};
}

export class Job<T = any, R = any, N extends string = string> {
/**
* The progress a job has performed so far.
Expand Down Expand Up @@ -547,27 +558,99 @@ export class Job<T = any, R = any, N extends string = string> {
*
* @returns dependencies separated by processed and unprocessed.
*/
async getDependencies() {
async getDependencies(
opts: DependenciesOpts = {},
): Promise<{
nextProcessedCursor?: number;
processed?: Record<string, any>;
nextUnprocessedCursor?: number;
unprocessed?: string[];
}> {
const client = await this.queue.client;

const multi = client.multi();
if (!opts.processed && !opts.unprocessed) {
await multi.hgetall(this.toKey(`${this.id}:processed`));
await multi.smembers(this.toKey(`${this.id}:dependencies`));

const [[err1, processed], [err2, unprocessed]] = (await multi.exec()) as [
[null | Error, { [jobKey: string]: string }],
[null | Error, string[]],
];

const transformedProcessed = Object.entries(processed).reduce(
(accumulator, [key, value]) => {
return { ...accumulator, [key]: JSON.parse(value) };
},
{},
);

return { processed: transformedProcessed, unprocessed };
} else {
const defaultOpts = {
cursor: 0,
count: 20,
};

await multi.hgetall(this.toKey(`${this.id}:processed`));
await multi.smembers(this.toKey(`${this.id}:dependencies`));
if (opts.processed) {
const processedOpts = Object.assign(defaultOpts, opts.processed);
await multi.hscan(
this.toKey(`${this.id}:processed`),
processedOpts.cursor,
'COUNT',
processedOpts.count,
);
}

const [[err1, processed], [err2, unprocessed]] = (await multi.exec()) as [
[null | Error, { [jobKey: string]: string }],
[null | Error, string[]],
];
if (opts.unprocessed) {
const unprocessedOpts = Object.assign(defaultOpts, opts.unprocessed);
await multi.sscan(
this.toKey(`${this.id}:dependencies`),
unprocessedOpts.cursor,
'COUNT',
unprocessedOpts.count,
);
}

const transformedProcessed = Object.entries(processed).reduce(
(accumulator, [key, value]) => {
return { ...accumulator, [key]: JSON.parse(value) };
},
{},
);
const [result1, result2] = await multi.exec();

const [processedCursor, processed = []] = opts.processed
? result1[1]
: [];
const [unprocessedCursor, unprocessed = []] = opts.unprocessed
? opts.processed
? result2[1]
: result1[1]
: [];

const transformedProcessed = processed.reduce(
(
accumulator: Record<string, any>,
currentValue: string,
index: number,
) => {
if (index % 2) {
return {
...accumulator,
[processed[index - 1]]: JSON.parse(currentValue),
};
}
return accumulator;
},
{},
);

return { processed: transformedProcessed, unprocessed };
return {
...(processedCursor
? {
processed: transformedProcessed,
nextProcessedCursor: Number(processedCursor),
}
: {}),
...(unprocessedCursor
? { unprocessed, nextUnprocessedCursor: Number(unprocessedCursor) }
: {}),
};
}
}

/**
Expand Down
5 changes: 4 additions & 1 deletion src/test/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ describe('flows', () => {
const processingParent = new Promise<void>((resolve, reject) => [
(parentProcessor = async (job: Job) => {
try {
const { processed } = await job.getDependencies();
const { processed, nextProcessedCursor } = await job.getDependencies({
processed: {},
});
expect(nextProcessedCursor).to.be.equal(0);
expect(Object.keys(processed)).to.have.length(3);

const childrenValues = await job.getChildrenValues();
Expand Down
62 changes: 62 additions & 0 deletions src/test/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1886,6 +1886,68 @@ describe('workers', function() {
await removeAllQueueData(new IORedis(), parentQueueName);
});

it('should get paginated unprocessed dependencies keys', async () => {
const value = { bar: 'something' };
const parentToken = 'parent-token';

const parentQueueName = `parent-queue-${v4()}`;

const parentQueue = new Queue(parentQueueName);
const parentWorker = new Worker(parentQueueName);
const childrenWorker = new Worker(queueName);

const data = { foo: 'bar' };
await Job.create(parentQueue, 'parent', data);
const parent = (await parentWorker.getNextJob(parentToken)) as Job;
const currentState = await parent.getState();

expect(currentState).to.be.equal('active');

await times(15, async (index: number) =>
Job.create(
queue,
`child${index}`,
{ idx: index, ...value },
{
parent: {
id: parent.id,
queue: 'bull:' + parentQueueName,
},
},
),
);
const {
nextUnprocessedCursor: nextCursor1,
unprocessed: unprocessed1,
} = await parent.getDependencies({
unprocessed: {
cursor: 0,
count: 10,
},
});

expect(unprocessed1.length).to.be.greaterThanOrEqual(10);

const {
nextUnprocessedCursor: nextCursor2,
unprocessed: unprocessed2,
} = await parent.getDependencies({
unprocessed: {
cursor: nextCursor1,
count: 10,
},
});

expect(unprocessed2.length).to.be.lessThanOrEqual(5);
expect(nextCursor2).to.be.equal(0);

await childrenWorker.close();
await parentWorker.close();

await parentQueue.close();
await removeAllQueueData(new IORedis(), parentQueueName);
});

it('should allow to fail jobs manually', async () => {
const worker = new Worker(queueName);
const token = 'my-token';
Expand Down

0 comments on commit 9b61bbb

Please sign in to comment.