MultiplePriorityQueues: Add ability to POP first message from a list of queues#107
Conversation
54ec037 to
7523e41
Compare
| import static net.greghaines.jesque.worker.WorkerEvent.WORKER_START; | ||
| import static net.greghaines.jesque.worker.WorkerEvent.WORKER_STOP; | ||
| import static net.greghaines.jesque.worker.WorkerImpl.NextQueueStrategy.DRAIN_WHILE_MESSAGES_EXISTS; | ||
| import static net.greghaines.jesque.worker.WorkerImpl.NextQueueStrategy.RESET_TO_HIGHEST_PRIORITY; |
There was a problem hiding this comment.
Please keep the import order that the rest of the project uses:
- static imports (If more than a few from the same class, import star)
- regular imports
There was a problem hiding this comment.
right, sorry about that 👍
723ca54 to
2f321c4
Compare
|
On latest changeset I've added |
|
@gresrun Can you have a look here ? |
|
Hey, we're just starting to use Jesque to replace our horrible mess of futures and threads (scala), and saw this pull request. Just curious if this functionality is going to make it in? Looks like exactly what we need. |
|
I'll try to take a look at this tonight. |
| ScriptUtils.readScript("/workerScripts/jesque_lpoplpush.lua"))); | ||
| this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_LUA))); | ||
| this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA))); | ||
| this.multiPriorityQueuesScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA))); |
There was a problem hiding this comment.
I'm not following this logic. Why have two script hashes for loading the same script, LPOPLPUSH_LUA? Is this second one supposed to load workerScripts/fromMultiplePriorityQueues.lua?
There was a problem hiding this comment.
You are absolutely correct! It seems that I've messed with 2 different branches and this PR is based on some old branch in my private repo. This is the latest branch and PR I've worked with: kenshoo#6 .
I'll add a commit of only the diff between the two PRs and will squash upon approval.
Sorry for the mess 🙏
| * {@inheritDoc} | ||
| */ | ||
| @Override | ||
| public void setOrderedPriorityQueues(final List<String> queues) { |
There was a problem hiding this comment.
Not sure I see the rationale behind making this a List argument. You aren't really using it as a list, just adding all the values into the BlockingDeque which all you need is a Collection for.
There was a problem hiding this comment.
The rational is that I join the list of queues to make a String param for the Lua script.
As the script is written, it requires a list of queues to fetch items from.
So the order is important - for priority queue behaviour only - and it's implemented via switching based on behaviour on getNextQueue()
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.Callable; | ||
|
|
| import java.util.Collections; | ||
|
|
||
| import static net.greghaines.jesque.utils.JesqueUtils.entry; | ||
| import static net.greghaines.jesque.utils.JesqueUtils.map; |
There was a problem hiding this comment.
Please keep the import order consistent with the rest of the project.
| return (String) this.jedis.evalsha(this.popScriptHash.get(), 3, key, key(INFLIGHT, this.name, curQueue), | ||
| JesqueUtils.createRecurringHashKey(key), Long.toString(System.currentTimeMillis())); | ||
| case RESET_TO_HIGHEST_PRIORITY: | ||
| return (String) this.jedis.evalsha(this.multiPriorityQueuesScriptHash.get(), 1, curQueue); |
There was a problem hiding this comment.
This shouldn't work as written. See my first comment.
There was a problem hiding this comment.
Agree 👍 please see my updated PR after fixing the branch-mess
ofirnk
left a comment
There was a problem hiding this comment.
All comments are handled in next commit. Thanks for your review 👍
| ScriptUtils.readScript("/workerScripts/jesque_lpoplpush.lua"))); | ||
| this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_LUA))); | ||
| this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA))); | ||
| this.multiPriorityQueuesScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA))); |
There was a problem hiding this comment.
You are absolutely correct! It seems that I've messed with 2 different branches and this PR is based on some old branch in my private repo. This is the latest branch and PR I've worked with: kenshoo#6 .
I'll add a commit of only the diff between the two PRs and will squash upon approval.
Sorry for the mess 🙏
| * {@inheritDoc} | ||
| */ | ||
| @Override | ||
| public void setOrderedPriorityQueues(final List<String> queues) { |
There was a problem hiding this comment.
The rational is that I join the list of queues to make a String param for the Lua script.
As the script is written, it requires a list of queues to fetch items from.
So the order is important - for priority queue behaviour only - and it's implemented via switching based on behaviour on getNextQueue()
| return (String) this.jedis.evalsha(this.popScriptHash.get(), 3, key, key(INFLIGHT, this.name, curQueue), | ||
| JesqueUtils.createRecurringHashKey(key), Long.toString(System.currentTimeMillis())); | ||
| case RESET_TO_HIGHEST_PRIORITY: | ||
| return (String) this.jedis.evalsha(this.multiPriorityQueuesScriptHash.get(), 1, curQueue); |
There was a problem hiding this comment.
Agree 👍 please see my updated PR after fixing the branch-mess
| import java.util.Collections; | ||
|
|
||
| import static net.greghaines.jesque.utils.JesqueUtils.entry; | ||
| import static net.greghaines.jesque.utils.JesqueUtils.map; |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.Callable; | ||
|
|
481b5d5 to
20f1105
Compare
…ing to use WorkerPoolimpl with MultiplePriorityQueues
20f1105 to
f55e59b
Compare
|
Any reason why this feature has not been added to the WorkerPoolImpl? |
|
AFAICR we just didn't need it there, and this implementation was enough for our use case |
This PR adds ability to provide a list of queues
[first_priority, second_priority, third_priority, ...]and receive the first message available.
This is done via a single Redis command that iterates server-side on the provided list of queues.
Suggestions how to improve this PR are welcome 🎉