Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/main/java/net/greghaines/jesque/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* A Worker polls for Jobs from a specified list of queues, executing them in
Expand Down Expand Up @@ -93,6 +94,12 @@ public interface Worker extends JobExecutor, Runnable {
* @param queues the queues to poll
*/
void setQueues(Collection<String> queues);

/**
* Clear any current queues and poll the given queues <b>in the specified order</b>.
* @param queues the queues to poll
*/
void setOrderedPriorityQueues(List<String> queues);

/**
* @return the worker event emitter for this worker
Expand Down
106 changes: 92 additions & 14 deletions src/main/java/net/greghaines/jesque/worker/WorkerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@
*/
package net.greghaines.jesque.worker;


import static net.greghaines.jesque.utils.ResqueConstants.*;
import static net.greghaines.jesque.worker.JobExecutor.State.*;
import static net.greghaines.jesque.worker.WorkerEvent.*;
import static net.greghaines.jesque.worker.WorkerImpl.NextQueueStrategy.RESET_TO_HIGHEST_PRIORITY;

import java.io.IOException;
import java.lang.Throwable;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingDeque;
Expand Down Expand Up @@ -65,9 +68,13 @@ public class WorkerImpl implements Worker {
protected static final long EMPTY_QUEUE_SLEEP_TIME = 500; // 500 ms
protected static final long RECONNECT_SLEEP_TIME = 5000; // 5 sec
protected static final int RECONNECT_ATTEMPTS = 120; // Total time: 10 min

private static final String LPOPLPUSH_LUA = "/workerScripts/jesque_lpoplpush.lua";
private static final String POP_LUA = "/workerScripts/jesque_pop.lua";
private static final String POP_FROM_MULTIPLE_PRIO_QUEUES = "/workerScripts/fromMultiplePriorityQueues.lua";

// Set the thread name to the message for debugging
private static volatile boolean threadNameChangingEnabled = false;
private final NextQueueStrategy nextQueueStrategy;

/**
* @return true if worker threads names will change during normal operation
Expand Down Expand Up @@ -113,6 +120,7 @@ protected static void checkQueues(final Iterable<String> queues) {
private final AtomicBoolean processingJob = new AtomicBoolean(false);
private final AtomicReference<String> popScriptHash = new AtomicReference<>(null);
private final AtomicReference<String> lpoplpushScriptHash = new AtomicReference<>(null);
private final AtomicReference<String> multiPriorityQueuesScriptHash = new AtomicReference<>(null);
private final long workerId = WORKER_COUNTER.getAndIncrement();
private final String threadNameBase = "Worker-" + this.workerId + " Jesque-" + VersionUtils.getVersion() + ": ";
private final AtomicReference<Thread> threadRef = new AtomicReference<Thread>(null);
Expand Down Expand Up @@ -144,6 +152,25 @@ public WorkerImpl(final Config config, final Collection<String> queues, final Jo
*/
public WorkerImpl(final Config config, final Collection<String> queues, final JobFactory jobFactory,
final Jedis jedis) {
this(config,
(queues == null ? Collections.EMPTY_LIST : new ArrayList<>(queues)),
jobFactory,
jedis,
NextQueueStrategy.DRAIN_WHILE_MESSAGES_EXISTS);
}

/**
* Creates a new WorkerImpl, with the given connection to Redis.<br>
* The worker will only listen to the supplied queues and execute jobs that are provided by the given job factory.
* @param config used to create a connection to Redis and the package prefix for incoming jobs
* @param queues the list of queues to poll
* @param jobFactory the job factory that materializes the jobs
* @param jedis the connection to Redis
* @param nextQueueStrategy defines worker behaviour once it has found messages in a queue
* @throws IllegalArgumentException if either config, queues, jobFactory or jedis is null
*/
public WorkerImpl(final Config config, final List<String> queues, final JobFactory jobFactory,
final Jedis jedis, NextQueueStrategy nextQueueStrategy) {
if (config == null) {
throw new IllegalArgumentException("config must not be null");
}
Expand All @@ -154,14 +181,15 @@ public WorkerImpl(final Config config, final Collection<String> queues, final Jo
throw new IllegalArgumentException("jedis must not be null");
}
checkQueues(queues);
this.nextQueueStrategy = nextQueueStrategy;
this.config = config;
this.jobFactory = jobFactory;
this.namespace = config.getNamespace();
this.jedis = jedis;
this.failQueueStrategyRef = new AtomicReference<FailQueueStrategy>(
new DefaultFailQueueStrategy(this.namespace));
authenticateAndSelectDB();
setQueues(queues);
setOrderedPriorityQueues(queues);
this.name = createName();
}

Expand All @@ -185,9 +213,9 @@ public void run() {
this.jedis.sadd(key(WORKERS), this.name);
this.jedis.set(key(WORKER, this.name, STARTED), new SimpleDateFormat(DATE_FORMAT).format(new Date()));
this.listenerDelegate.fireEvent(WORKER_START, this, null, null, null, null, null);
this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript("/workerScripts/jesque_pop.lua")));
this.lpoplpushScriptHash.set(this.jedis.scriptLoad(
ScriptUtils.readScript("/workerScripts/jesque_lpoplpush.lua")));
this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_LUA)));
this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA)));
this.multiPriorityQueuesScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_FROM_MULTIPLE_PRIO_QUEUES)));
poll();
} catch (Exception ex) {
LOG.error("Uncaught exception in worker run-loop!", ex);
Expand Down Expand Up @@ -328,6 +356,14 @@ public void removeAllQueues() {
*/
@Override
public void setQueues(final Collection<String> queues) {
setOrderedPriorityQueues(new ArrayList<>(queues));
}

/**
* {@inheritDoc}
*/
@Override
public void setOrderedPriorityQueues(final List<String> queues) {
Copy link
Copy Markdown
Owner

@gresrun gresrun Oct 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I see the rationale behind making this a List argument. You aren't really using it as a list, just adding all the values into the BlockingDeque which all you need is a Collection for.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rational is that I join the list of queues to make a String param for the Lua script.
As the script is written, it requires a list of queues to fetch items from.
So the order is important - for priority queue behaviour only - and it's implemented via switching based on behaviour on getNextQueue()

checkQueues(queues);
this.queueNames.clear();
this.queueNames.addAll((queues == ALL_QUEUES) // Using object equality on purpose
Expand Down Expand Up @@ -408,9 +444,8 @@ protected void poll() {
if (threadNameChangingEnabled) {
renameThread("Waiting for " + JesqueUtils.join(",", this.queueNames));
}
curQueue = this.queueNames.poll(EMPTY_QUEUE_SLEEP_TIME, TimeUnit.MILLISECONDS);
curQueue = getNextQueue();
if (curQueue != null) {
this.queueNames.add(curQueue); // Rotate the queues
checkPaused();
// Might have been waiting in poll()/checkPaused() for a while
if (RUNNING.equals(this.state.get())) {
Expand All @@ -419,10 +454,13 @@ protected void poll() {
if (payload != null) {
process(ObjectMapperFactory.get().readValue(payload, Job.class), curQueue);
missCount = 0;
} else if (++missCount >= this.queueNames.size() && RUNNING.equals(this.state.get())) {
// Keeps worker from busy-spinning on empty queues
missCount = 0;
Thread.sleep(EMPTY_QUEUE_SLEEP_TIME);
} else {
missCount++;
if (shouldSleep(missCount) && RUNNING.equals(this.state.get())) {
// Keeps worker from busy-spinning on empty queues
missCount = 0;
Thread.sleep(EMPTY_QUEUE_SLEEP_TIME);
}
}
}
}
Expand All @@ -440,15 +478,43 @@ protected void poll() {
}
}

private boolean shouldSleep(int missCount) {
return nextQueueStrategy == RESET_TO_HIGHEST_PRIORITY ||
missCount >= this.queueNames.size();
}

protected String getNextQueue() throws InterruptedException {
switch (nextQueueStrategy) {
case DRAIN_WHILE_MESSAGES_EXISTS:
final String nextPollQueue = this.queueNames.poll(EMPTY_QUEUE_SLEEP_TIME, TimeUnit.MILLISECONDS);
if (nextPollQueue != null) {
// Rotate the queues
this.queueNames.add(nextPollQueue);
}
return nextPollQueue;
case RESET_TO_HIGHEST_PRIORITY:
return JesqueUtils.join(",", this.queueNames);
default:
throw new RuntimeException("Unimplemented 'nextQueueStrategy'");
}
}

/**
* Remove a job from the given queue.
* @param curQueue the queue to remove a job from
* @return a JSON string of a job or null if there was nothing to de-queue
*/
protected String pop(final String curQueue) {
final String key = key(QUEUE, curQueue);
return (String) this.jedis.evalsha(this.popScriptHash.get(), 3, key, key(INFLIGHT, this.name, curQueue),
JesqueUtils.createRecurringHashKey(key), Long.toString(System.currentTimeMillis()));
switch (nextQueueStrategy) {
case DRAIN_WHILE_MESSAGES_EXISTS:
return (String) this.jedis.evalsha(this.popScriptHash.get(), 3, key, key(INFLIGHT, this.name, curQueue),
JesqueUtils.createRecurringHashKey(key), Long.toString(System.currentTimeMillis()));
case RESET_TO_HIGHEST_PRIORITY:
return (String) this.jedis.evalsha(this.multiPriorityQueuesScriptHash.get(), 1, curQueue);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't work as written. See my first comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree 👍 please see my updated PR after fixing the branch-mess

default:
throw new RuntimeException("Unimplemented 'nextQueueStrategy'");
}
}

/**
Expand Down Expand Up @@ -708,4 +774,16 @@ protected String lpoplpush(final String from, final String to) {
public String toString() {
return this.namespace + COLON + WORKER + COLON + this.name;
}

public enum NextQueueStrategy {
/**
* Would drain messages as long as current queue is not empty
*/
DRAIN_WHILE_MESSAGES_EXISTS,

/**
* Would reset to check {first queue, second queue, etc'} after each message
*/
RESET_TO_HIGHEST_PRIORITY
}
}
10 changes: 9 additions & 1 deletion src/main/java/net/greghaines/jesque/worker/WorkerPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,16 @@ public void removeAllQueues() {
*/
@Override
public void setQueues(final Collection<String> queues) {
setOrderedPriorityQueues(new ArrayList<>(queues));
}

/**
* {@inheritDoc}
*/
@Override
public void setOrderedPriorityQueues(final List<String> queues) {
for (final Worker worker : this.workers) {
worker.setQueues(queues);
worker.setOrderedPriorityQueues(queues);
}
}

Expand Down
10 changes: 6 additions & 4 deletions src/main/java/net/greghaines/jesque/worker/WorkerPoolImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Set;
import java.util.*;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingDeque;
Expand Down Expand Up @@ -352,6 +349,11 @@ public Set<String> doWork(final Jedis jedis) {
}
}

@Override
public void setOrderedPriorityQueues(List<String> queues) {
throw new UnsupportedOperationException("Not implemented, yet");
}

/**
* {@inheritDoc}
*/
Expand Down
34 changes: 34 additions & 0 deletions src/main/resources/workerScripts/fromMultiplePriorityQueues.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- new
-- User: Ofir Naor
-- Date: 3/27/16
-- Time: 2:47 PM
--
local queues = KEYS[1]
--local debug = {'Hooray!', "'"..queues.."'"}

local QUEUE_NAME_CAPTURING_REGEX = '([^,]+)'
local OPTIONAL_COMMA_SEPARATOR = ',?'
local OPTIONAL_SPACE_SEPARATOR = '%s*'
local NEXT_QUEUE_REGEX = QUEUE_NAME_CAPTURING_REGEX .. OPTIONAL_COMMA_SEPARATOR .. OPTIONAL_SPACE_SEPARATOR

for q in queues.gmatch(queues, NEXT_QUEUE_REGEX) do
local queueName = 'resque:queue:' .. q
local status, queueType = next(redis.call('TYPE', queueName))
local payload
if queueType == 'zset' then
local firstMsg = redis.call('ZRANGE', queueName, '0', '0')
if firstMsg ~= nil then
payload = firstMsg[1]
if payload ~= nil then
local removedItems = redis.call('ZREM', queueName, payload)
end
end
elseif queueType == 'list' then
payload = redis.call('LPOP', queueName)
end

if payload ~= nil then
return payload
end
end
return nil
31 changes: 29 additions & 2 deletions src/test/java/net/greghaines/jesque/worker/TestWorkerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@
import static net.greghaines.jesque.utils.JesqueUtils.entry;
import static net.greghaines.jesque.utils.JesqueUtils.map;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;

import net.greghaines.jesque.Config;
import net.greghaines.jesque.ConfigBuilder;
import net.greghaines.jesque.TestAction;

import org.jmock.Mockery;
import org.jmock.integration.junit4.JUnit4Mockery;
import org.jmock.internal.InvocationExpectation;
import org.jmock.lib.legacy.ClassImposteriser;
import org.junit.Assert;
import org.junit.Test;
import redis.clients.jedis.Jedis;

public class TestWorkerImpl {

private static final Config CONFIG = new ConfigBuilder().build();

@Test(expected=IllegalArgumentException.class)
public void testConstructor_NullConfig() {
new WorkerImpl(null, null, null, null);
Expand Down Expand Up @@ -58,4 +64,25 @@ public void testCheckQueues_EmptyQueue() {
public void testCheckQueues_OK() {
WorkerImpl.checkQueues(Arrays.asList("foo", "bar"));
}

@Test
public void verifyNoExceptionsForAllNextQueueStrategies() throws InterruptedException {
final MapBasedJobFactory jobFactory = new MapBasedJobFactory(Collections.EMPTY_MAP);
for (WorkerImpl.NextQueueStrategy nextQueueStrategy : WorkerImpl.NextQueueStrategy.values()) {
final WorkerImpl worker = new WorkerImpl(CONFIG,
new ArrayList<String>(),
jobFactory,
getJedis(),
nextQueueStrategy);
worker.pop(worker.getNextQueue());
}
}

private Jedis getJedis() {
final Mockery mockCtx = new JUnit4Mockery();
mockCtx.setImposteriser(ClassImposteriser.INSTANCE);
final Jedis jedis = mockCtx.mock(Jedis.class);
mockCtx.addExpectation(new InvocationExpectation());
return jedis;
}
}
Loading