Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rejecting the whole queue when a task rejects #29

Open
kachkaev opened this issue Mar 20, 2018 · 25 comments
Open

Rejecting the whole queue when a task rejects #29

kachkaev opened this issue Mar 20, 2018 · 25 comments

Comments

@kachkaev
Copy link

kachkaev commented Mar 20, 2018

I'm trying p-queue after using promise-queue for some time. Given this background, I was expecting to see the whole queue failing if any of its tasks rejects. However, this does not seem to happen.

Here's an example:

// testPQueue.js

import * as PQueue from "p-queue";

async function processQueue() {
  const queue = new PQueue();
  queue.add(
    () =>
      new Promise((resolve, reject) =>
        setTimeout(() => {
          reject();
        }, 500),
      ),
  );

  await queue.onIdle();
  console.log("should not be here");
  return 42;
}

(async () => {
  try {
    const result = await processQueue();
    console.log(result);
  } catch (e) {
    console.log("could not process queue");
  }
})();
node ./testPQueue.js

Instead of seeing could not process queue in my stdout, I get this:

should not be here
42
(node:13679) UnhandledPromiseRejectionWarning: undefined
(node:13679) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:13679) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

How can I stop and reject the whole queue once any its tasks has failed?

@sindresorhus
Copy link
Owner

queue.add() rejects if the task fails, you could add a catch handler.

@sindresorhus
Copy link
Owner

Given this background, I was expecting to see the whole queue failing if any of its tasks rejects. However, this does not seem to happen.

promise-queue also rejects in the queue.add().

How would you imagine the whole queue should be failing? I don't think the PQueue class should throw. We could maybe add a onFail method you could subscribe to. Happy to consider ideas.

@rclark
Copy link

rclark commented Jul 18, 2018

My expectation was that if one task fails, then the queue would stop processing additional tasks. I'm not quite sure whether or not this is in the same spirit as the original post.

const Queue = require('p-queue');

const util = require('util');
const sleep = util.promisify(setTimeout);
const queue = new Queue({ concurrency: 1 });

const tasks = [1, 2, 3, 4, 5].map((task) => {
  return queue.add(async () => {
    await sleep(3000);

    if (task === 2) throw new Error('foo');
    if (task === 4) throw new Error('bar');
    console.log(`complete ${task}`);
  });
});


(async () => {
  try {
    await Promise.all(tasks)
  } catch (err) {
    console.error(err);
  }
})();

Running this script demonstrates how the queue keeps running other tasks once one has failed, and also demonstrates some aspect of Promise.all() error handling that I don't understand -- where did the bar error go?

@Cellule
Copy link

Cellule commented Jul 28, 2018

Running this script demonstrates how the queue keeps running other tasks once one has failed, and also demonstrates some aspect of Promise.all() error handling that I don't understand -- where did the bar error go?

Promise.all rejects when the first Promise reject, you cannot get the other errors with it.

In general I like the idea of not stopping on the first error, when making a lot of independent requests you want to know if there was an error, but you don't want the promise to resolve until all the requests are completed.

On the other hand, an onFail() method that resolve (or rejects) on the first error seen in the queue like Promise.all would be nice.

Overall, I think the documentation of the module could use some work for error handling. I had to see this issue to figure out minimal information about error handling

@Cellule
Copy link

Cellule commented Jul 28, 2018

One thing I was hoping to see was onIdle to reject if there was an error.
That way I can queue up all my tasks and once they're all done, I can know if any had an error.
Now I have to do something like

let error;
queue.addAll(mytasks).catch(err => error = err);
queue.onIdle().then(() => {
  if (error) throw error;
});
// or with async await
async function test(){
  try {
    await queue.addAll(mytasks);
  } catch (e) {
    await queue.onIdle();
    // here I know the queue is really done I can throw the error
    throw e;
  }
}

@szmarczak
Copy link
Collaborator

I'll make a PR for that. New option: pauseOnError

@wujekbogdan
Copy link

wujekbogdan commented Oct 12, 2018

It would be great if there was a queue.onError method.

// Edit

And even better if onError and onSuccess callback were called with a context of a specific task, but it would also require a change in the add method, because at the moment it's not possible to pass any metadata as options.

Real life example:

class Example {

  public getCronTasks() {
    const tasks = [];
    // Do sth...
    return tasks;
  }

  public async processQueue() {
    const cronTasks = await this.getCronTasks();
    const queue = new PQueue({
      concurrency: 10,
    });

    cronTasks.forEach(({cronTask, taskSnapshot}) => {
     queue.add(async () => {
        try {
          await cronTask.run();
          this.handleSuccess(taskSnapshot);
        } catch (e) {
          this.handleError(taskSnapshot, e);
        }
      });
    });
  }

  private handleSuccess(taskSnapshot) {
    return taskSnapshot.ref.set({
      status: Status.COMPLETE,
    }, {merge: true});

  }

  private handleError(taskSnapshot, error) {
    return taskSnapshot.ref.set({
      error: error.message,
      status: Status.ERROR,
    }, {merge: true});
  }

}

It's not very nice because there's an additional logic within the .add() method. If thre was an onError method AND if it was possible to pass additional metadata to the .add() function the code could be written this way:

class Example {

  public getCronTasks() {
    const tasks = [];
    // Do sth...
    return tasks;
  }

  public async processQueue() {
    const cronTasks = await this.getCronTasks();
    const queue = new PQueue({
      concurrency: 10,
    });

    cronTasks.forEach(({cronTask, taskSnapshot}) => {
      queue.add(async () => {
        await cronTask.run();
      }, {
        metaData: {
          taskSnapshot,
        },
      });
    });

    queue.onSuccess(this.handleSuccess);
    queue.onError(this.handleError());
  }

  private handleSuccess({ taskSnapshot }) {
    return taskSnapshot.ref.set({
      status: Status.COMPLETE,
    }, {merge: true});

  }

  private handleError(error, { taskSnapshot }) {
    return taskSnapshot.ref.set({
      error: error.message,
      status: Status.ERROR,
    }, {merge: true});
  }

}

@wujekbogdan
Copy link

wujekbogdan commented Oct 12, 2018

Ignore my previous post onSucces and onError callbacks and the metadata features are not needed. Here's the solution:

  public async processQueue() {
    const cronTasks = await this.getCronTasks();
    const queue = new PQueue({
      concurrency: 10,
    });

    cronTasks.forEach(({cronTask, taskSnapshot}) => {
      const promise = queue.add(async () => {
        await cronTask.run();
      });

      promise
        .then(() => {
          this.handleSuccess(taskSnapshot);
        })
        .catch((error) => {
          this.handleError(taskSnapshot, error);
        });
    });
  }

  private handleSuccess(taskSnapshot) {
    return taskSnapshot.ref.set({
      status: Status.COMPLETE,
    }, {merge: true});

  }

  private handleError(taskSnapshot, error) {
    console.error(error.message);
    return taskSnapshot.ref.set({
      error: error.message,
      status: Status.ERROR,
    }, {merge: true});
  }

@catamphetamine
Copy link

catamphetamine commented Feb 22, 2019

I agree with @Cellule : I expect the queue to throw when doing await onIdle() if there's an error, not ignore it.

My implementation:

import PQueue from 'p-queue';

import waitForPromises from './waitForPromises.js';

// `Queue` executes tasks in parallel within the limits of `concurrency`.
export default class Queue {
  constructor({
    concurrency = getDefaultConcurrency()
  } = {}) {
    this.promises = [];
    this.queue = new PQueue({ concurrency, autoStart: false });
  }

  add(task) {
    this.promises.push(this.queue.add(task));
  }

  addAll(tasks) {
    for (const task of tasks) {
      this.add(task);
    }
    // `p-queue.addAll()` returns a `Promise.all()` which is not a correct way
    // to wait for all those `Promise`s. Instead, it should've done something like
    // `Promise.allSettled()` and then check for any "rejected" ones like
    // `waitForPromises()` does.
    // this.promises.push(this.queue.addAll(tasks));
  }

  // Returns a `Promise`.
  _waitUntilFinished() {
    const promises = this.promises;
    this.promises = [];
    // `p-queue` doesn't properly do `try/catch` during `await p-queue.onIdle()`.
    // https://github.com/sindresorhus/p-queue/issues/26
    // https://github.com/sindresorhus/p-queue/issues/29
    // This `Queue` implementation fixes that by `await`ing on `this.promises`.
    return waitForPromises(promises).then(() => {
      // If there were any new tasks added to the queue since it has been started
      // then wait for those new tasks to end too.
      if (this.promises.length > 0) {
        return this._waitUntilFinished();
      }
    });
  }

  async run() {
    this.promises = [this.queue.onIdle()].concat(this.promises);
    this.queue.start();
    await this._waitUntilFinished();
  }
}

function getDefaultConcurrency() {
  if (typeof QUEUE_CONCURRENCY !== 'undefined') {
    return QUEUE_CONCURRENCY;
  }
  return 10;
}
export default async function waitForPromises(promises, { printErrors = defaultPrintErrors } = {}) {
  const errors = [];
  const results = await Promise.allSettled(promises);
  for (const result of results) {
    if (result.status === 'fulfilled') {
      // The promise has finished.
    } else {
      // The promise has errored.
      const error = result.reason;
      errors.push(error);
    }
  }
  if (errors.length > 0) {
    if (printErrors) {
      printErrors(errors);
    }
    const error = new Error(errors[0].message);
    error.errors = errors;
    throw error;
  }
}

function defaultPrintErrors(errors) {
  console.error(`* ${errors.length} Promise${errors.length === 1 ? '' : 's'} Errored *`)
  for (const error of errors) {
    console.error(error.stack || error);
    // There may be deeply nested properties in an error
    // if it was thrown from a stack of `waitForPromises()` calls.
    // Example: `error.errors[0].errors[0].errors[]`.
    console.error(JSON.stringify(error, null, 2));
  }
}
  errorHandlingQueue.addAll(...)
  errorHandlingQueue.addAll(...)
  await errorHandlingQueue.run()

@benliu723
Copy link

what is the current status?

@szmarczak
Copy link
Collaborator

szmarczak commented Apr 29, 2019

Hang on, I'll send a PR in 30 mins.

Woah, it took me 10 mins to do it from scratch (git pull etc.).

@benliu723
Copy link

@szmarczak cool, thanks!

@benliu723
Copy link

@szmarczak Thanks for your made for this feature.
I still wondering this option pauseOnError works for typescript.

@szmarczak
Copy link
Collaborator

Sorry, I don't understand quite you. Can you elaborate?

@benliu723
Copy link

@szmarczak Sorry, this is my fault. I thought your pr already merged. Please ignore my previous message.

@clockworked247
Copy link

@sindresorhus will @szmarczak PR be considered for merge? I'm also interested in this change. Thanks.

@sindresorhus
Copy link
Owner

Does it make sense that queue.add() can reject? Or should we leave that up to a queue method like queue#onIdle() or queue#onError()? I don't think we should have to handle the same error in multiple places. So I would personally prefer have one single place to handle failure. Where should it be?

@jyboudreau
Copy link

To me it definitively makes sense that queue.add can reject. It's important to the caller to know which specific call to add has failed or not. This is especially true when adding abstractions on top of queue.add. In my usage I need to serialize writes over a custom native (Node.js Addon) channel. To simplify the native side, I decided to do all serialization in JavaScript and am attempting to use p-queue with a concurrency of 1 for this.

This way my user can safely call my write function (which will call queue.add) in different parts of his app, and be able to know which writes failed.

I also am looking for a behaviour similar to OP and was surprised that I could not get this behaviour with the following:

async function write(data) {
  try {
    return queue.add(() => nativeWrite(data))
  } catch(e) {
    // Stop further processing
    queue.clear();
    // Re-throw
    throw e        
  }
}

With the above if we queue a few items, after the first write throws, one further write will be attempted even if we clear the queue. Basically after we have resolved/rejected the promise from add, the next item is already queued and there is no way to clear it.

I think that pauseOnError makes sense to me. The other thing I can think of is to enable (via config?) p-queue to return full control to the caller before queuing the next item.

@clockworked247
Copy link

clockworked247 commented Aug 12, 2019

+1 queue.add() rejection seems right. I too want to be able to handle the failure at the individual queue item level.

queue.add() rejects if the task fails, you could add a catch handler.

So I guess there's no change then? Just need to add catch() error handlers on the individual invocations of add().

@LaurensRietveld
Copy link

I bumped into this issue as well, where I'd like the whole queue to fail when there is an error.
My usecase:

  • iterating through millions of records
  • doing the same processing for each record
  • any exception is bad news, and should stop the whole queue

I really like the p-queue API, up until I realized I'd have to resort to odd workarounds to get this to work.

I understand there are other use cases where we'd like to ignore or handle individual errors, but that seems to be compatible with an pauseOnError option right?
I.e., adding a pauseOnError would not break existing implementations. It would still support handling the errors individually. But it would also make the life easier of those where the error should 'break' the queue.

I'd expect something like this to work:

const delay = require('delay');
const {default: PQueue} = require('p-queue');


const queue = new PQueue({concurrency: 2, pauseOnError: true});

//Adding catch clause to the `add` function to avoid unhandled rejections
//As I've turned `pauseOnError` on 
queue.add(() => Promise.resolve()).catch(() => {});
queue.add(() => delay(2000)).catch(() => {});
queue.add(() => Promise.reject()).catch(() => {});
queue.add(() => Promise.resolve()).catch(() => {});

queue.onIdle()
  .then(() => {
     //We throw an error in the 3rd promise. We should not end up here
   })
  .catch((e) => {
    //I'd expect the first rejected promise to end up in this catch clause
  })

@openreply-dleinhaeuser
Copy link

Maybe I am forgetting something important, but since PQueue is already an EventEmitter, couldn't it just emit error and idle events? Then one could use p-event to wait for idle while checking for errors (and in that case presumably stopping processing)

@ssupinsky
Copy link

How come this isn't added yet? That's such an obvious use case

@jsumners
Copy link

@ssupinsky have you followed the whole thread? Have you submitted a PR to add the feature?

@ssupinsky
Copy link

ssupinsky commented Feb 15, 2021

@jsumners I have followed the thread and I was under the impression that people settled on the pauseOnError option, which has a PR. I'm sorry if I come across as rude, I was genuinely surprised that it's been 3 years since that was brought up and no progress was made to this day.

@ssupinsky
Copy link

ssupinsky commented Feb 15, 2021

If anybody's looking for a workaround here's what I came up with:

(Obviously no ts, it's too clunky to include here. You can figure out type annotations by yourself, or I don't know, ping me and I'll try to help)

class AutoPausingQueue extends PQueue {
  add(fn, options) {
    const fnPausing = () => {
      try {
        const fnResult = fn();
        return !(fnResult instanceof Promise)
          ? fnResult
          : fnResult.catch(err => {
            this.pause();
            throw err;
          });
      } catch(e) {
        this.pause();
        throw e;
      }
    };
    return PQueue.prototype.add.call(this, fnPausing, options);
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.