Skip to content

Commit

Permalink
fix(movetodelayed): check if job is in active state
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Apr 22, 2021
1 parent 77cc94b commit 4e63f70
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 18 deletions.
9 changes: 2 additions & 7 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,8 @@ export class Scripts {

const args = this.moveToDelayedArgs(queue, jobId, timestamp);
const result = await (<any>client).moveToDelayed(args);
switch (result) {
case -1:
throw new Error(
'Missing Job ' +
jobId +
' when trying to move from active to delayed',
);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'moveToDelayed');
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/commands/moveToDelayed-5.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Output:
0 - OK
-1 - Missing job.
-3 - Job not in active set.
Events:
- delayed key.
Expand All @@ -27,8 +28,13 @@ if rcall("EXISTS", KEYS[3]) == 1 then
local score = tonumber(ARGV[1])
local delayedTimestamp = (score / 0x1000)

local numRemovedElements = rcall("LREM", KEYS[1], -1, jobId)

if(numRemovedElements < 1) then
return -3
end

rcall("ZADD", KEYS[2], score, jobId)
rcall("LREM", KEYS[1], 0, jobId)

rcall("XADD", KEYS[4], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp);
rcall("XADD", KEYS[5], "*", "nextTimestamp", delayedTimestamp);
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/jobs-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export interface JobsOptions {
priority?: number;

/**
* An amount of miliseconds to wait until this job can be processed.
* An amount of milliseconds to wait until this job can be processed.
* Note that for accurate delays, worker and producers
* should have their clocks synchronized.
*/
Expand Down
4 changes: 3 additions & 1 deletion src/test/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ describe('flows', () => {
numJobs = await parentQueue.getWaitingCount();
expect(numJobs).to.be.equal(0);

await flow.close();
await parentWorker.close();
await removeAllQueueData(new IORedis(), parentQueueName);
});

Expand Down Expand Up @@ -413,7 +415,7 @@ describe('flows', () => {
expect(await tree.children[1].job.getState()).to.be.equal('waiting');

await flow.close();

await worker.close();
await removeAllQueueData(new IORedis(), parentQueueName);
});

Expand Down
8 changes: 6 additions & 2 deletions src/test/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,21 +272,25 @@ describe('Job', function() {
});

it('moves the job to delayed for retry if attempts are given and backoff is non zero', async function() {
const job = await Job.create(
const worker = new Worker(queueName);
const token = 'my-token';
await Job.create(
queue,
'test',
{ foo: 'bar' },
{ attempts: 3, backoff: 300 },
);
const job = (await worker.getNextJob(token)) as Job;
const isFailed = await job.isFailed();
expect(isFailed).to.be.equal(false);
await job.moveToFailed(new Error('test error'), '0', true);
await job.moveToFailed(new Error('test error'), token, true);
const isFailed2 = await job.isFailed();
expect(isFailed2).to.be.equal(false);
expect(job.stacktrace).not.be.equal(null);
expect(job.stacktrace.length).to.be.equal(1);
const isDelayed = await job.isDelayed();
expect(isDelayed).to.be.equal(true);
await worker.close();
});

it('applies stacktrace limit on failure', async function() {
Expand Down
13 changes: 7 additions & 6 deletions src/test/test_rate_limiter.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { Queue } from '@src/classes';
import { QueueEvents } from '@src/classes/queue-events';
import { QueueScheduler } from '@src/classes/queue-scheduler';
import { Worker } from '@src/classes/worker';
import { Queue } from '../classes';
import { QueueEvents } from '../classes/queue-events';
import { QueueScheduler } from '../classes/queue-scheduler';
import { Worker } from '../classes/worker';
import { assert, expect } from 'chai';
import * as IORedis from 'ioredis';
import { after, every, last } from 'lodash';
import { beforeEach, describe, it } from 'mocha';
import { v4 } from 'uuid';
import { removeAllQueueData } from '@src/utils';
import { removeAllQueueData } from '../utils';

describe('Rate Limiter', function() {
let queue: Queue;
Expand Down Expand Up @@ -56,6 +56,7 @@ describe('Rate Limiter', function() {

const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.eq(3);
await worker.close();
});

it('should obey the rate limit', async function() {
Expand Down Expand Up @@ -205,7 +206,7 @@ describe('Rate Limiter', function() {
});

queueEvents.on('completed', ({ jobId }) => {
const group = last(jobId.split(':'));
const group: string = last(jobId.split(':'));
completed[group] = completed[group] || [];
completed[group].push(Date.now());

Expand Down

0 comments on commit 4e63f70

Please sign in to comment.