-
-
Notifications
You must be signed in to change notification settings - Fork 187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rejecting the whole queue when a task rejects #29
Comments
|
How would you imagine the whole queue should be failing? I don't think the |
My expectation was that if one task fails, then the queue would stop processing additional tasks. I'm not quite sure whether or not this is in the same spirit as the original post. const Queue = require('p-queue');
const util = require('util');
const sleep = util.promisify(setTimeout);
const queue = new Queue({ concurrency: 1 });
const tasks = [1, 2, 3, 4, 5].map((task) => {
return queue.add(async () => {
await sleep(3000);
if (task === 2) throw new Error('foo');
if (task === 4) throw new Error('bar');
console.log(`complete ${task}`);
});
});
(async () => {
try {
await Promise.all(tasks)
} catch (err) {
console.error(err);
}
})(); Running this script demonstrates how the queue keeps running other tasks once one has failed, and also demonstrates some aspect of |
Promise.all rejects when the first Promise reject, you cannot get the other errors with it. In general I like the idea of not stopping on the first error, when making a lot of independent requests you want to know if there was an error, but you don't want the promise to resolve until all the requests are completed. On the other hand, an Overall, I think the documentation of the module could use some work for error handling. I had to see this issue to figure out minimal information about error handling |
One thing I was hoping to see was let error;
queue.addAll(mytasks).catch(err => error = err);
queue.onIdle().then(() => {
if (error) throw error;
});
// or with async await
async function test(){
try {
await queue.addAll(mytasks);
} catch (e) {
await queue.onIdle();
// here I know the queue is really done I can throw the error
throw e;
}
} |
I'll make a PR for that. New option: |
It would be great if there was a // Edit And even better if Real life example: class Example {
public getCronTasks() {
const tasks = [];
// Do sth...
return tasks;
}
public async processQueue() {
const cronTasks = await this.getCronTasks();
const queue = new PQueue({
concurrency: 10,
});
cronTasks.forEach(({cronTask, taskSnapshot}) => {
queue.add(async () => {
try {
await cronTask.run();
this.handleSuccess(taskSnapshot);
} catch (e) {
this.handleError(taskSnapshot, e);
}
});
});
}
private handleSuccess(taskSnapshot) {
return taskSnapshot.ref.set({
status: Status.COMPLETE,
}, {merge: true});
}
private handleError(taskSnapshot, error) {
return taskSnapshot.ref.set({
error: error.message,
status: Status.ERROR,
}, {merge: true});
}
} It's not very nice because there's an additional logic within the class Example {
public getCronTasks() {
const tasks = [];
// Do sth...
return tasks;
}
public async processQueue() {
const cronTasks = await this.getCronTasks();
const queue = new PQueue({
concurrency: 10,
});
cronTasks.forEach(({cronTask, taskSnapshot}) => {
queue.add(async () => {
await cronTask.run();
}, {
metaData: {
taskSnapshot,
},
});
});
queue.onSuccess(this.handleSuccess);
queue.onError(this.handleError());
}
private handleSuccess({ taskSnapshot }) {
return taskSnapshot.ref.set({
status: Status.COMPLETE,
}, {merge: true});
}
private handleError(error, { taskSnapshot }) {
return taskSnapshot.ref.set({
error: error.message,
status: Status.ERROR,
}, {merge: true});
}
} |
Ignore my previous post public async processQueue() {
const cronTasks = await this.getCronTasks();
const queue = new PQueue({
concurrency: 10,
});
cronTasks.forEach(({cronTask, taskSnapshot}) => {
const promise = queue.add(async () => {
await cronTask.run();
});
promise
.then(() => {
this.handleSuccess(taskSnapshot);
})
.catch((error) => {
this.handleError(taskSnapshot, error);
});
});
}
private handleSuccess(taskSnapshot) {
return taskSnapshot.ref.set({
status: Status.COMPLETE,
}, {merge: true});
}
private handleError(taskSnapshot, error) {
console.error(error.message);
return taskSnapshot.ref.set({
error: error.message,
status: Status.ERROR,
}, {merge: true});
} |
I agree with @Cellule : I expect the queue to throw when doing My implementation: import PQueue from 'p-queue';
import waitForPromises from './waitForPromises.js';
// `Queue` executes tasks in parallel within the limits of `concurrency`.
export default class Queue {
constructor({
concurrency = getDefaultConcurrency()
} = {}) {
this.promises = [];
this.queue = new PQueue({ concurrency, autoStart: false });
}
add(task) {
this.promises.push(this.queue.add(task));
}
addAll(tasks) {
for (const task of tasks) {
this.add(task);
}
// `p-queue.addAll()` returns a `Promise.all()` which is not a correct way
// to wait for all those `Promise`s. Instead, it should've done something like
// `Promise.allSettled()` and then check for any "rejected" ones like
// `waitForPromises()` does.
// this.promises.push(this.queue.addAll(tasks));
}
// Returns a `Promise`.
_waitUntilFinished() {
const promises = this.promises;
this.promises = [];
// `p-queue` doesn't properly do `try/catch` during `await p-queue.onIdle()`.
// https://github.com/sindresorhus/p-queue/issues/26
// https://github.com/sindresorhus/p-queue/issues/29
// This `Queue` implementation fixes that by `await`ing on `this.promises`.
return waitForPromises(promises).then(() => {
// If there were any new tasks added to the queue since it has been started
// then wait for those new tasks to end too.
if (this.promises.length > 0) {
return this._waitUntilFinished();
}
});
}
async run() {
this.promises = [this.queue.onIdle()].concat(this.promises);
this.queue.start();
await this._waitUntilFinished();
}
}
function getDefaultConcurrency() {
if (typeof QUEUE_CONCURRENCY !== 'undefined') {
return QUEUE_CONCURRENCY;
}
return 10;
} export default async function waitForPromises(promises, { printErrors = defaultPrintErrors } = {}) {
const errors = [];
const results = await Promise.allSettled(promises);
for (const result of results) {
if (result.status === 'fulfilled') {
// The promise has finished.
} else {
// The promise has errored.
const error = result.reason;
errors.push(error);
}
}
if (errors.length > 0) {
if (printErrors) {
printErrors(errors);
}
const error = new Error(errors[0].message);
error.errors = errors;
throw error;
}
}
function defaultPrintErrors(errors) {
console.error(`* ${errors.length} Promise${errors.length === 1 ? '' : 's'} Errored *`)
for (const error of errors) {
console.error(error.stack || error);
// There may be deeply nested properties in an error
// if it was thrown from a stack of `waitForPromises()` calls.
// Example: `error.errors[0].errors[0].errors[]`.
console.error(JSON.stringify(error, null, 2));
}
} errorHandlingQueue.addAll(...)
errorHandlingQueue.addAll(...)
await errorHandlingQueue.run() |
what is the current status? |
Hang on, I'll send a PR in 30 mins. Woah, it took me 10 mins to do it from scratch (git pull etc.). |
@szmarczak cool, thanks! |
@szmarczak Thanks for your made for this feature. |
Sorry, I don't understand quite you. Can you elaborate? |
@szmarczak Sorry, this is my fault. I thought your pr already merged. Please ignore my previous message. |
@sindresorhus will @szmarczak PR be considered for merge? I'm also interested in this change. Thanks. |
Does it make sense that |
To me it definitively makes sense that queue.add can reject. It's important to the caller to know which specific call to add has failed or not. This is especially true when adding abstractions on top of This way my user can safely call my I also am looking for a behaviour similar to OP and was surprised that I could not get this behaviour with the following: async function write(data) {
try {
return queue.add(() => nativeWrite(data))
} catch(e) {
// Stop further processing
queue.clear();
// Re-throw
throw e
}
} With the above if we queue a few items, after the first write throws, one further write will be attempted even if we clear the queue. Basically after we have resolved/rejected the promise from add, the next item is already queued and there is no way to clear it. I think that |
+1 queue.add() rejection seems right. I too want to be able to handle the failure at the individual queue item level.
So I guess there's no change then? Just need to add catch() error handlers on the individual invocations of add(). |
I bumped into this issue as well, where I'd like the whole queue to fail when there is an error.
I really like the p-queue API, up until I realized I'd have to resort to odd workarounds to get this to work. I understand there are other use cases where we'd like to ignore or handle individual errors, but that seems to be compatible with an I'd expect something like this to work:
|
Maybe I am forgetting something important, but since |
How come this isn't added yet? That's such an obvious use case |
@ssupinsky have you followed the whole thread? Have you submitted a PR to add the feature? |
@jsumners I have followed the thread and I was under the impression that people settled on the |
If anybody's looking for a workaround here's what I came up with: (Obviously no ts, it's too clunky to include here. You can figure out type annotations by yourself, or I don't know, ping me and I'll try to help) class AutoPausingQueue extends PQueue {
add(fn, options) {
const fnPausing = () => {
try {
const fnResult = fn();
return !(fnResult instanceof Promise)
? fnResult
: fnResult.catch(err => {
this.pause();
throw err;
});
} catch(e) {
this.pause();
throw e;
}
};
return PQueue.prototype.add.call(this, fnPausing, options);
}
} |
I'm trying
p-queue
after usingpromise-queue
for some time. Given this background, I was expecting to see the whole queue failing if any of its tasks rejects. However, this does not seem to happen.Here's an example:
Instead of seeing
could not process queue
in mystdout
, I get this:How can I stop and reject the whole queue once any its tasks has failed?
The text was updated successfully, but these errors were encountered: