Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job): add exclusive execution option #2465

Open
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

roggervalf
Copy link
Collaborator

No description provided.

@manast
Copy link
Contributor

manast commented Mar 10, 2024

Ok, interesting use of rate-limiter for achieving this effect. A couple of things:

  • I am not sure the name of the option conveys the actual function, for me it is more like "sequential execution", although the queue already executes sequentially, except when we are doing retries. I think we need to think more about the name here.
  • This option only makes sense with concurrency is 1, as otherwise we have no guarantee that the execution is "sequential", so maybe if concurrency > 1 and this option is chosen we should rise an error.

@manast
Copy link
Contributor

manast commented Mar 10, 2024

Another thing to think about, what happens if there are delayed jobs already in the queue, should we care about them?
Maybe not actually... as they are delayed it means they should be executed as soon as possible after they expire. Also if the user cares about order, then they probably will not have delayed jobs running side by side regular jobs...

@@ -38,6 +38,7 @@ import type { QueueEvents } from './queue-events';
const logger = debuglog('bull');

const optsDecodeMap = {
ee: 'exclusiveExecution',
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preserveOrder must be used

@roggervalf roggervalf marked this pull request as ready for review March 26, 2024 04:35
const worker = new Worker('queueName', async (job: Job) => {
// do some work
}, {
concurrency: 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As concurrency is by default 1, I assume it is not required to specify any when using "preserveOrder".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, no necessary, I added it just to be clear, I can add a comment about the default value

@manast
Copy link
Contributor

manast commented Mar 27, 2024

I think we need more tests for cover the scenario when you have several workers running in parallel. For instance, let's say you have 10 workers (assume all have concurrency 1 AND preserver order flag to true, if any has another set of options then it will not work as expected, we should also write this in the documentation). Now, I assume that as soon one job is fetched, rate limit kicks in and the other workers will just be idling. We need tests to verify that this is the case, also, what happens if the job stalls, it may go back to wait but we need to make sure that no other job waiting is taken by any of the other workers that are idling when this happens, so order is preserved. We also need to document that if you add delayed jobs, then the order cannot be guaranteed either...

@manast
Copy link
Contributor

manast commented Mar 27, 2024

It is a lot, but if we want to guarantee order there are several edge cases to consider, probably others we have not thought about yet.

...this.opts,
};

if (this.opts.stalledInterval <= 0) {
throw new Error('stalledInterval must be greater than 0');
}

if (this.opts.preserveOrder && this.opts.concurrency > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to refactor this check as we are doing exactly the same on "set concurrency". In fact, couldn't we just do this.concurrency = ... here and let the checks from set concurrency take care of it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing, shouldn't we have this check on the python version as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was think on adding it later

* If true, it will rate limit the queue when moving this job into delayed.
* Will stop rate limiting the queue until this job is moved to completed or failed.
*/
ee?: boolean;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this be better as "po" from Preserver Order?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that option comes from the old version of this pr, I'm going to delete it

@roggervalf
Copy link
Collaborator Author

I think we need more tests for cover the scenario when you have several workers running in parallel. For instance, let's say you have 10 workers (assume all have concurrency 1 AND preserver order flag to true, if any has another set of options then it will not work as expected, we should also write this in the documentation). Now, I assume that as soon one job is fetched, rate limit kicks in and the other workers will just be idling. We need tests to verify that this is the case, also, what happens if the job stalls, it may go back to wait but we need to make sure that no other job waiting is taken by any of the other workers that are idling when this happens, so order is preserved. We also need to document that if you add delayed jobs, then the order cannot be guaranteed either...

there is no global concurrency, so having more than 1 worker will break preserve order behavior, also need to document it.

github-actions bot pushed a commit that referenced this pull request Jul 15, 2024
# [5.9.0](v5.8.7...v5.9.0) (2024-07-15)

### Features

* **queue:** support global concurrency ([#2496](#2496)) ref [#2465](#2465) ([47ba055](47ba055))
```

{% hint style="warning" %}
This feature is only allowed when using concurrency 1, any greater value will throw an error:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add another hint, when using retries and backoffs, for instance, a failed job will keep the queue idle during the time the job is being backed off until it is picked again.

Copy link
Contributor

@manast manast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just had a couple of comments on the documentation.

@@ -0,0 +1,19 @@
# Preserve Order

BullMQ supports preserving execution order in jobs. When _preserveOrder_ option is provided as *true*, jobs will be processed in the same order, independently of retry strategies. If the current job fails and has a retry strategy, queue will be in rate limit state until the delay is accomplish.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> BullMQ supports preserving execution order in jobs. When the preserveOrder option is provided as true, jobs will be processed in the same order, independently of retry strategies. If the current job fails and has a retry strategy, the queue will be in rate limit state until the retry delay has expired.

{% endhint %}

{% hint style="warning" %}
This feature is only allowed when using concurrency 1, any greater value will throw an error. Make sure to also set a [global concurrency](https://docs.bullmq.io/guide/queues/global-concurrency)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> This feature is only allowed when using concurrency 1, any greater value will throw an error. Make sure to also set a global concurrency if you want to preserve order having multiple running workers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants