-
Notifications
You must be signed in to change notification settings - Fork 132
MultiplePriorityQueues: Add ability to POP first message from a list of queues #107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
8261b7d
Merge pull request #2 from gresrun/master
c3a88ea
MultiplePriorityQueues: Add ability to POP first message from a list …
e9fb8d7
MultiplePriorityQueues: Fix comments
f12f23d
Syncing with kenshoo/jesque/pull/6
f55e59b
Make Jesque compile and throw unsupported operation exception for try…
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,19 +15,22 @@ | |
| */ | ||
| package net.greghaines.jesque.worker; | ||
|
|
||
|
|
||
| import static net.greghaines.jesque.utils.ResqueConstants.*; | ||
| import static net.greghaines.jesque.worker.JobExecutor.State.*; | ||
| import static net.greghaines.jesque.worker.WorkerEvent.*; | ||
| import static net.greghaines.jesque.worker.WorkerImpl.NextQueueStrategy.RESET_TO_HIGHEST_PRIORITY; | ||
|
|
||
| import java.io.IOException; | ||
| import java.lang.Throwable; | ||
| import java.lang.management.ManagementFactory; | ||
| import java.net.InetAddress; | ||
| import java.net.UnknownHostException; | ||
| import java.text.SimpleDateFormat; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.Date; | ||
| import java.util.List; | ||
| import java.util.concurrent.BlockingDeque; | ||
| import java.util.concurrent.Callable; | ||
| import java.util.concurrent.LinkedBlockingDeque; | ||
|
|
@@ -65,9 +68,13 @@ public class WorkerImpl implements Worker { | |
| protected static final long EMPTY_QUEUE_SLEEP_TIME = 500; // 500 ms | ||
| protected static final long RECONNECT_SLEEP_TIME = 5000; // 5 sec | ||
| protected static final int RECONNECT_ATTEMPTS = 120; // Total time: 10 min | ||
|
|
||
| private static final String LPOPLPUSH_LUA = "/workerScripts/jesque_lpoplpush.lua"; | ||
| private static final String POP_LUA = "/workerScripts/jesque_pop.lua"; | ||
| private static final String POP_FROM_MULTIPLE_PRIO_QUEUES = "/workerScripts/fromMultiplePriorityQueues.lua"; | ||
|
|
||
| // Set the thread name to the message for debugging | ||
| private static volatile boolean threadNameChangingEnabled = false; | ||
| private final NextQueueStrategy nextQueueStrategy; | ||
|
|
||
| /** | ||
| * @return true if worker threads names will change during normal operation | ||
|
|
@@ -113,6 +120,7 @@ protected static void checkQueues(final Iterable<String> queues) { | |
| private final AtomicBoolean processingJob = new AtomicBoolean(false); | ||
| private final AtomicReference<String> popScriptHash = new AtomicReference<>(null); | ||
| private final AtomicReference<String> lpoplpushScriptHash = new AtomicReference<>(null); | ||
| private final AtomicReference<String> multiPriorityQueuesScriptHash = new AtomicReference<>(null); | ||
| private final long workerId = WORKER_COUNTER.getAndIncrement(); | ||
| private final String threadNameBase = "Worker-" + this.workerId + " Jesque-" + VersionUtils.getVersion() + ": "; | ||
| private final AtomicReference<Thread> threadRef = new AtomicReference<Thread>(null); | ||
|
|
@@ -144,6 +152,25 @@ public WorkerImpl(final Config config, final Collection<String> queues, final Jo | |
| */ | ||
| public WorkerImpl(final Config config, final Collection<String> queues, final JobFactory jobFactory, | ||
| final Jedis jedis) { | ||
| this(config, | ||
| (queues == null ? Collections.EMPTY_LIST : new ArrayList<>(queues)), | ||
| jobFactory, | ||
| jedis, | ||
| NextQueueStrategy.DRAIN_WHILE_MESSAGES_EXISTS); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a new WorkerImpl, with the given connection to Redis.<br> | ||
| * The worker will only listen to the supplied queues and execute jobs that are provided by the given job factory. | ||
| * @param config used to create a connection to Redis and the package prefix for incoming jobs | ||
| * @param queues the list of queues to poll | ||
| * @param jobFactory the job factory that materializes the jobs | ||
| * @param jedis the connection to Redis | ||
| * @param nextQueueStrategy defines worker behaviour once it has found messages in a queue | ||
| * @throws IllegalArgumentException if either config, queues, jobFactory or jedis is null | ||
| */ | ||
| public WorkerImpl(final Config config, final List<String> queues, final JobFactory jobFactory, | ||
| final Jedis jedis, NextQueueStrategy nextQueueStrategy) { | ||
| if (config == null) { | ||
| throw new IllegalArgumentException("config must not be null"); | ||
| } | ||
|
|
@@ -154,14 +181,15 @@ public WorkerImpl(final Config config, final Collection<String> queues, final Jo | |
| throw new IllegalArgumentException("jedis must not be null"); | ||
| } | ||
| checkQueues(queues); | ||
| this.nextQueueStrategy = nextQueueStrategy; | ||
| this.config = config; | ||
| this.jobFactory = jobFactory; | ||
| this.namespace = config.getNamespace(); | ||
| this.jedis = jedis; | ||
| this.failQueueStrategyRef = new AtomicReference<FailQueueStrategy>( | ||
| new DefaultFailQueueStrategy(this.namespace)); | ||
| authenticateAndSelectDB(); | ||
| setQueues(queues); | ||
| setOrderedPriorityQueues(queues); | ||
| this.name = createName(); | ||
| } | ||
|
|
||
|
|
@@ -185,9 +213,9 @@ public void run() { | |
| this.jedis.sadd(key(WORKERS), this.name); | ||
| this.jedis.set(key(WORKER, this.name, STARTED), new SimpleDateFormat(DATE_FORMAT).format(new Date())); | ||
| this.listenerDelegate.fireEvent(WORKER_START, this, null, null, null, null, null); | ||
| this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript("/workerScripts/jesque_pop.lua"))); | ||
| this.lpoplpushScriptHash.set(this.jedis.scriptLoad( | ||
| ScriptUtils.readScript("/workerScripts/jesque_lpoplpush.lua"))); | ||
| this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_LUA))); | ||
| this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA))); | ||
| this.multiPriorityQueuesScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_FROM_MULTIPLE_PRIO_QUEUES))); | ||
| poll(); | ||
| } catch (Exception ex) { | ||
| LOG.error("Uncaught exception in worker run-loop!", ex); | ||
|
|
@@ -328,6 +356,14 @@ public void removeAllQueues() { | |
| */ | ||
| @Override | ||
| public void setQueues(final Collection<String> queues) { | ||
| setOrderedPriorityQueues(new ArrayList<>(queues)); | ||
| } | ||
|
|
||
| /** | ||
| * {@inheritDoc} | ||
| */ | ||
| @Override | ||
| public void setOrderedPriorityQueues(final List<String> queues) { | ||
| checkQueues(queues); | ||
| this.queueNames.clear(); | ||
| this.queueNames.addAll((queues == ALL_QUEUES) // Using object equality on purpose | ||
|
|
@@ -408,9 +444,8 @@ protected void poll() { | |
| if (threadNameChangingEnabled) { | ||
| renameThread("Waiting for " + JesqueUtils.join(",", this.queueNames)); | ||
| } | ||
| curQueue = this.queueNames.poll(EMPTY_QUEUE_SLEEP_TIME, TimeUnit.MILLISECONDS); | ||
| curQueue = getNextQueue(); | ||
| if (curQueue != null) { | ||
| this.queueNames.add(curQueue); // Rotate the queues | ||
| checkPaused(); | ||
| // Might have been waiting in poll()/checkPaused() for a while | ||
| if (RUNNING.equals(this.state.get())) { | ||
|
|
@@ -419,10 +454,13 @@ protected void poll() { | |
| if (payload != null) { | ||
| process(ObjectMapperFactory.get().readValue(payload, Job.class), curQueue); | ||
| missCount = 0; | ||
| } else if (++missCount >= this.queueNames.size() && RUNNING.equals(this.state.get())) { | ||
| // Keeps worker from busy-spinning on empty queues | ||
| missCount = 0; | ||
| Thread.sleep(EMPTY_QUEUE_SLEEP_TIME); | ||
| } else { | ||
| missCount++; | ||
| if (shouldSleep(missCount) && RUNNING.equals(this.state.get())) { | ||
| // Keeps worker from busy-spinning on empty queues | ||
| missCount = 0; | ||
| Thread.sleep(EMPTY_QUEUE_SLEEP_TIME); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -440,15 +478,43 @@ protected void poll() { | |
| } | ||
| } | ||
|
|
||
| private boolean shouldSleep(int missCount) { | ||
| return nextQueueStrategy == RESET_TO_HIGHEST_PRIORITY || | ||
| missCount >= this.queueNames.size(); | ||
| } | ||
|
|
||
| protected String getNextQueue() throws InterruptedException { | ||
| switch (nextQueueStrategy) { | ||
| case DRAIN_WHILE_MESSAGES_EXISTS: | ||
| final String nextPollQueue = this.queueNames.poll(EMPTY_QUEUE_SLEEP_TIME, TimeUnit.MILLISECONDS); | ||
| if (nextPollQueue != null) { | ||
| // Rotate the queues | ||
| this.queueNames.add(nextPollQueue); | ||
| } | ||
| return nextPollQueue; | ||
| case RESET_TO_HIGHEST_PRIORITY: | ||
| return JesqueUtils.join(",", this.queueNames); | ||
| default: | ||
| throw new RuntimeException("Unimplemented 'nextQueueStrategy'"); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Remove a job from the given queue. | ||
| * @param curQueue the queue to remove a job from | ||
| * @return a JSON string of a job or null if there was nothing to de-queue | ||
| */ | ||
| protected String pop(final String curQueue) { | ||
| final String key = key(QUEUE, curQueue); | ||
| return (String) this.jedis.evalsha(this.popScriptHash.get(), 3, key, key(INFLIGHT, this.name, curQueue), | ||
| JesqueUtils.createRecurringHashKey(key), Long.toString(System.currentTimeMillis())); | ||
| switch (nextQueueStrategy) { | ||
| case DRAIN_WHILE_MESSAGES_EXISTS: | ||
| return (String) this.jedis.evalsha(this.popScriptHash.get(), 3, key, key(INFLIGHT, this.name, curQueue), | ||
| JesqueUtils.createRecurringHashKey(key), Long.toString(System.currentTimeMillis())); | ||
| case RESET_TO_HIGHEST_PRIORITY: | ||
| return (String) this.jedis.evalsha(this.multiPriorityQueuesScriptHash.get(), 1, curQueue); | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This shouldn't work as written. See my first comment.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree 👍 please see my updated PR after fixing the branch-mess |
||
| default: | ||
| throw new RuntimeException("Unimplemented 'nextQueueStrategy'"); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -708,4 +774,16 @@ protected String lpoplpush(final String from, final String to) { | |
| public String toString() { | ||
| return this.namespace + COLON + WORKER + COLON + this.name; | ||
| } | ||
|
|
||
| public enum NextQueueStrategy { | ||
| /** | ||
| * Would drain messages as long as current queue is not empty | ||
| */ | ||
| DRAIN_WHILE_MESSAGES_EXISTS, | ||
|
|
||
| /** | ||
| * Would reset to check {first queue, second queue, etc'} after each message | ||
| */ | ||
| RESET_TO_HIGHEST_PRIORITY | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
34 changes: 34 additions & 0 deletions
34
src/main/resources/workerScripts/fromMultiplePriorityQueues.lua
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| -- new | ||
| -- User: Ofir Naor | ||
| -- Date: 3/27/16 | ||
| -- Time: 2:47 PM | ||
| -- | ||
| local queues = KEYS[1] | ||
| --local debug = {'Hooray!', "'"..queues.."'"} | ||
|
|
||
| local QUEUE_NAME_CAPTURING_REGEX = '([^,]+)' | ||
| local OPTIONAL_COMMA_SEPARATOR = ',?' | ||
| local OPTIONAL_SPACE_SEPARATOR = '%s*' | ||
| local NEXT_QUEUE_REGEX = QUEUE_NAME_CAPTURING_REGEX .. OPTIONAL_COMMA_SEPARATOR .. OPTIONAL_SPACE_SEPARATOR | ||
|
|
||
| for q in queues.gmatch(queues, NEXT_QUEUE_REGEX) do | ||
| local queueName = 'resque:queue:' .. q | ||
| local status, queueType = next(redis.call('TYPE', queueName)) | ||
| local payload | ||
| if queueType == 'zset' then | ||
| local firstMsg = redis.call('ZRANGE', queueName, '0', '0') | ||
| if firstMsg ~= nil then | ||
| payload = firstMsg[1] | ||
| if payload ~= nil then | ||
| local removedItems = redis.call('ZREM', queueName, payload) | ||
| end | ||
| end | ||
| elseif queueType == 'list' then | ||
| payload = redis.call('LPOP', queueName) | ||
| end | ||
|
|
||
| if payload ~= nil then | ||
| return payload | ||
| end | ||
| end | ||
| return nil |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I see the rationale behind making this a List argument. You aren't really using it as a list, just adding all the values into the BlockingDeque which all you need is a Collection for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rational is that I join the list of queues to make a String param for the Lua script.
As the script is written, it requires a list of queues to fetch items from.
So the order is important - for priority queue behaviour only - and it's implemented via switching based on behaviour on
getNextQueue()