Skip to content

MultiplePriorityQueues: Add ability to POP first message from a list of queues#107

Merged
gresrun merged 5 commits into
gresrun:masterfrom
ofirnk:ofirnk/priority-queues-list
Nov 24, 2016
Merged

MultiplePriorityQueues: Add ability to POP first message from a list of queues#107
gresrun merged 5 commits into
gresrun:masterfrom
ofirnk:ofirnk/priority-queues-list

Conversation

@ofirnk
Copy link
Copy Markdown
Contributor

@ofirnk ofirnk commented Mar 27, 2016

This PR adds ability to provide a list of queues
[first_priority, second_priority, third_priority, ...]
and receive the first message available.

This is done via a single Redis command that iterates server-side on the provided list of queues.

Suggestions how to improve this PR are welcome 🎉

@ofirnk ofirnk force-pushed the ofirnk/priority-queues-list branch from 54ec037 to 7523e41 Compare March 27, 2016 21:24
@coveralls
Copy link
Copy Markdown

Coverage Status

Coverage increased (+0.03%) to 80.623% when pulling 7523e41 on ofirnk:ofirnk/priority-queues-list into 5483d4e on gresrun:master.

import static net.greghaines.jesque.worker.WorkerEvent.WORKER_START;
import static net.greghaines.jesque.worker.WorkerEvent.WORKER_STOP;
import static net.greghaines.jesque.worker.WorkerImpl.NextQueueStrategy.DRAIN_WHILE_MESSAGES_EXISTS;
import static net.greghaines.jesque.worker.WorkerImpl.NextQueueStrategy.RESET_TO_HIGHEST_PRIORITY;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the import order that the rest of the project uses:

  1. static imports (If more than a few from the same class, import star)
  2. regular imports

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, sorry about that 👍

@ofirnk ofirnk force-pushed the ofirnk/priority-queues-list branch 2 times, most recently from 723ca54 to 2f321c4 Compare March 29, 2016 08:51
@coveralls
Copy link
Copy Markdown

Coverage Status

Coverage increased (+0.4%) to 80.948% when pulling 2f321c4 on ofirnk:ofirnk/priority-queues-list into 5483d4e on gresrun:master.

@coveralls
Copy link
Copy Markdown

Coverage Status

Coverage increased (+0.4%) to 80.948% when pulling 2f321c4 on ofirnk:ofirnk/priority-queues-list into 5483d4e on gresrun:master.

@ofirnk
Copy link
Copy Markdown
Contributor Author

ofirnk commented Mar 29, 2016

On latest changeset I've added setOrderedPriorityQueues(List queues) to Worker interface

@coveralls
Copy link
Copy Markdown

Coverage Status

Coverage increased (+0.4%) to 80.948% when pulling 2f321c4 on ofirnk:ofirnk/priority-queues-list into 5483d4e on gresrun:master.

@shlomisut
Copy link
Copy Markdown

@gresrun Can you have a look here ?

@rathboma
Copy link
Copy Markdown

rathboma commented Sep 29, 2016

Hey, we're just starting to use Jesque to replace our horrible mess of futures and threads (scala), and saw this pull request. Just curious if this functionality is going to make it in? Looks like exactly what we need.

@gresrun
Copy link
Copy Markdown
Owner

gresrun commented Sep 29, 2016

I'll try to take a look at this tonight.

@gresrun gresrun self-assigned this Sep 29, 2016
@shlomisut
Copy link
Copy Markdown

@rathboma from our end we can say we've been using this in production for the last few months, and it has been working great.

@gresrun thanks, will be happy to see this as part of the main artifact!

ScriptUtils.readScript("/workerScripts/jesque_lpoplpush.lua")));
this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_LUA)));
this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA)));
this.multiPriorityQueuesScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA)));
Copy link
Copy Markdown
Owner

@gresrun gresrun Oct 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following this logic. Why have two script hashes for loading the same script, LPOPLPUSH_LUA? Is this second one supposed to load workerScripts/fromMultiplePriorityQueues.lua?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are absolutely correct! It seems that I've messed with 2 different branches and this PR is based on some old branch in my private repo. This is the latest branch and PR I've worked with: kenshoo#6 .
I'll add a commit of only the diff between the two PRs and will squash upon approval.
Sorry for the mess 🙏

* {@inheritDoc}
*/
@Override
public void setOrderedPriorityQueues(final List<String> queues) {
Copy link
Copy Markdown
Owner

@gresrun gresrun Oct 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I see the rationale behind making this a List argument. You aren't really using it as a list, just adding all the values into the BlockingDeque which all you need is a Collection for.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rational is that I join the list of queues to make a String param for the Lua script.
As the script is written, it requires a list of queues to fetch items from.
So the order is important - for priority queue behaviour only - and it's implemented via switching based on behaviour on getNextQueue()

import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

Copy link
Copy Markdown
Owner

@gresrun gresrun Oct 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here about the imports.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change also

import java.util.Collections;

import static net.greghaines.jesque.utils.JesqueUtils.entry;
import static net.greghaines.jesque.utils.JesqueUtils.map;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the import order consistent with the rest of the project.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right - will change

return (String) this.jedis.evalsha(this.popScriptHash.get(), 3, key, key(INFLIGHT, this.name, curQueue),
JesqueUtils.createRecurringHashKey(key), Long.toString(System.currentTimeMillis()));
case RESET_TO_HIGHEST_PRIORITY:
return (String) this.jedis.evalsha(this.multiPriorityQueuesScriptHash.get(), 1, curQueue);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't work as written. See my first comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree 👍 please see my updated PR after fixing the branch-mess

Copy link
Copy Markdown
Contributor Author

@ofirnk ofirnk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All comments are handled in next commit. Thanks for your review 👍

ScriptUtils.readScript("/workerScripts/jesque_lpoplpush.lua")));
this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_LUA)));
this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA)));
this.multiPriorityQueuesScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA)));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are absolutely correct! It seems that I've messed with 2 different branches and this PR is based on some old branch in my private repo. This is the latest branch and PR I've worked with: kenshoo#6 .
I'll add a commit of only the diff between the two PRs and will squash upon approval.
Sorry for the mess 🙏

* {@inheritDoc}
*/
@Override
public void setOrderedPriorityQueues(final List<String> queues) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rational is that I join the list of queues to make a String param for the Lua script.
As the script is written, it requires a list of queues to fetch items from.
So the order is important - for priority queue behaviour only - and it's implemented via switching based on behaviour on getNextQueue()

return (String) this.jedis.evalsha(this.popScriptHash.get(), 3, key, key(INFLIGHT, this.name, curQueue),
JesqueUtils.createRecurringHashKey(key), Long.toString(System.currentTimeMillis()));
case RESET_TO_HIGHEST_PRIORITY:
return (String) this.jedis.evalsha(this.multiPriorityQueuesScriptHash.get(), 1, curQueue);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree 👍 please see my updated PR after fixing the branch-mess

import java.util.Collections;

import static net.greghaines.jesque.utils.JesqueUtils.entry;
import static net.greghaines.jesque.utils.JesqueUtils.map;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right - will change

import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change also

@ofirnk ofirnk force-pushed the ofirnk/priority-queues-list branch from 481b5d5 to 20f1105 Compare November 1, 2016 13:33
…ing to use WorkerPoolimpl with MultiplePriorityQueues
@ofirnk ofirnk force-pushed the ofirnk/priority-queues-list branch from 20f1105 to f55e59b Compare November 1, 2016 13:36
@gresrun gresrun merged commit fda4afa into gresrun:master Nov 24, 2016
@bp-FLN
Copy link
Copy Markdown
Contributor

bp-FLN commented Dec 27, 2017

Any reason why this feature has not been added to the WorkerPoolImpl?
Are there any technical issues that prevent this from being done?
I'd like to provide a PR but need to make sure first if it's worth implementing.

@ofirnk
Copy link
Copy Markdown
Contributor Author

ofirnk commented Dec 27, 2017

AFAICR we just didn't need it there, and this implementation was enough for our use case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants