Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements for Job#retry #318

Merged
merged 4 commits into from
Jul 12, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,21 +204,26 @@ Job.prototype.promote = function(){
});
};

/**
* Attempts to retry the job. Only a job that has failed can be retried.
*
* @return {Promise} If resolved and return code is 1, then the queue emits a waiting event
* otherwise the operation was not a success and throw the corresponding error. If the promise
* rejects, it indicates that the script failed to execute
*/
Job.prototype.retry = function(){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not allowing retries of completed jobs as well, since this could be reused to implement recurrent jobs in the future.

Copy link
Contributor Author

@xdc0 xdc0 Jul 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One consideration on that though. The current implementation works with failed jobs only, so we'll be breaking some expectations on Job#retry by letting it to retry completed job as well.

One common usage scenario I think is having parallel workers that have some kind of retry logic for failed jobs, something like this:

setInterval(() => {
  queue.getFailed().then(jobs => {
    return Promise.each(jobs, job => job.retry());
  });
}, 60 * 1000);

The above may lead to problems because if another worker retried a job successfully, it might be retried yet again by another worker since that job is also retriable if it's completed.

Perhaps we can allow parameters to retry to drive the behavior? or expose repeat as a separate Job function instead? Considering that if we "Retry" a complated job, we are not really "Retrying" it, we are "Repeating" it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes a retry with parameters, or the script part with parameters and then reuse that code in the repeat function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

speaking of which. Wouldn't be nice that retry includes a delay parameter as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I made the script accept an options parameter to determine on which state the job should be when attempting to reprocess it. This should support retrying a job (from a failed state) or repeating a job (from the completed state).

As for the delay, I took a dive into it, but it was taking me too much time, maybe that can be added on a separate PR?

Copy link

@smakovetzky-r7 smakovetzky-r7 Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xdc0 @manast
why "retry for completed" still not implement?

var key = this.queue.toKey('wait');
var failed = this.queue.toKey('failed');
var channel = this.queue.toKey('jobs');
var multi = this.queue.multi();
var queue = this.queue;
var _this = this;

multi.srem(failed, this.jobId);
// if queue is LIFO use rpushAsync
multi[(this.opts.lifo ? 'r' : 'l') + 'push'](key, this.jobId);
multi.publish(channel, this.jobId);

return multi.execAsync().then(function(){
_this.queue.emit('waiting', _this);
return _this;
return scripts.retryJob(this).then(function(result) {
if (result === 1) {
queue.emit('waiting', _this);
} else if (result === 0) {
throw new Error('Couldn\'t retry job: The job doesn\'t exist');
} else if (result === -1) {
throw new Error('Couldn\'t retry job: The job is locked');
} else if (result === -2) {
throw new Error('Couldn\'t retry job: The job has been already retried or has not failed');
}
});
};

Expand Down
58 changes: 58 additions & 0 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,64 @@ var scripts = {
limit
];

return execScript.apply(scripts, args);
},

/**
* Attempts to retry a job
*
* @param {Job} job
*
* @return {Promise<Number>} Returns a promise that evaluates to a return code:
* 1 means the operation was a success
* 0 means the job does not exist
* -1 means the job is currently locked and can't be retried.
* -2 means the job was not found in the `failed` set
*/
retryJob: function(job) {
var push = (job.opts.lifo ? 'R' : 'L') + 'PUSH';

var script = [
'if (redis.call("EXISTS", KEYS[1]) == 1) then',
' if (redis.call("EXISTS", KEYS[2]) == 0) then',
' if (redis.call("SREM", KEYS[3], ARGV[1]) == 1) then',
' redis.call("' + push + '", KEYS[4], ARGV[1])',
' redis.call("PUBLISH", KEYS[5], ARGV[1])',
' return 1',
' else',
' return -2',
' end',
' else',
' return -1',
' end',
'else',
' return 0',
'end'
].join('\n');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of this nested code I would propose something like this:

if ((redis.call("EXISTS", KEYS[1]) ~= 1) then
  return 0
end
if ((redis.call("EXISTS", KEYS[2])  ~= 0) then
  return -1
end
.
.
.
return 1;


var queue = job.queue;

var keys = [
queue.toKey(job.jobId),
queue.toKey(job.jobId) + ':lock',
queue.toKey('failed'),
queue.toKey('wait'),
queue.toKey('jobs')
];

var args = [
queue.client,
'retryJob',
script,
5,
keys[0],
keys[1],
keys[2],
keys[3],
keys[4],
job.jobId
];

return execScript.apply(scripts, args);
}
};
Expand Down