Skip to content

Commit

Permalink
Add atomicProcessUnacks() and getAllMessages()
Browse files Browse the repository at this point in the history
  • Loading branch information
smukil committed Jan 17, 2020
1 parent ad6c57f commit 12d242a
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ public interface DynoQueue extends Closeable {
*/
public Message get(String messageId);

/**
*
* Attempts to return all the messages found in the hashmap. It's a best-effort return of all payloads, i.e. it may
* not 100% match with what's in the queue metadata at any given time and is read with a non-quorum connection.
*
* @return Returns a list of all messages found in the message hashmap.
*/
public List<Message> getAllMessages();

/**
*
* Same as get(), but uses the non quorum connection.
Expand Down Expand Up @@ -222,6 +231,7 @@ public interface DynoQueue extends Closeable {
* Process un-acknowledged messages. The messages which are polled by the client but not ack'ed are moved back to queue
*/
public void processUnacks();
public void atomicProcessUnacks();

/*
* <=== Begin unsafe* functions. ===>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class RedisDynoQueue implements DynoQueue {

private final ShardingStrategy shardingStrategy;

private final boolean singleRingTopology;

// Tracks the number of message IDs to prefetch based on the message counts requested by the caller via pop().
@VisibleForTesting
AtomicInteger numIdsToPrefetch;
Expand All @@ -96,15 +98,15 @@ public class RedisDynoQueue implements DynoQueue {
@VisibleForTesting
AtomicInteger unsafeNumIdsToPrefetchAllShards;

public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, ShardingStrategy shardingStrategy) {
this(redisKeyPrefix, queueName, allShards, shardName, 60_000, shardingStrategy);
public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, ShardingStrategy shardingStrategy, boolean singleRingTopology) {
this(redisKeyPrefix, queueName, allShards, shardName, 60_000, shardingStrategy, singleRingTopology);
}

public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, int unackScheduleInMS, ShardingStrategy shardingStrategy) {
this(Clock.systemDefaultZone(), redisKeyPrefix, queueName, allShards, shardName, unackScheduleInMS, shardingStrategy);
public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, int unackScheduleInMS, ShardingStrategy shardingStrategy, boolean singleRingTopology) {
this(Clock.systemDefaultZone(), redisKeyPrefix, queueName, allShards, shardName, unackScheduleInMS, shardingStrategy, singleRingTopology);
}

public RedisDynoQueue(Clock clock, String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, int unackScheduleInMS, ShardingStrategy shardingStrategy) {
public RedisDynoQueue(Clock clock, String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, int unackScheduleInMS, ShardingStrategy shardingStrategy, boolean singleRingTopology) {
this.clock = clock;
this.redisKeyPrefix = redisKeyPrefix;
this.queueName = queueName;
Expand All @@ -116,6 +118,7 @@ public RedisDynoQueue(Clock clock, String redisKeyPrefix, String queueName, Set<

this.numIdsToPrefetch = new AtomicInteger(0);
this.unsafeNumIdsToPrefetchAllShards = new AtomicInteger(0);
this.singleRingTopology = singleRingTopology;

this.om = QueueUtils.constructObjectMapper();
this.monitor = new QueueMonitor(queueName, shardName);
Expand All @@ -127,7 +130,11 @@ public RedisDynoQueue(Clock clock, String redisKeyPrefix, String queueName, Set<

schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);

schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
if (this.singleRingTopology) {
schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
} else {
schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
}

logger.info(RedisDynoQueue.class.getName() + " is ready to serve " + queueName);
}
Expand Down Expand Up @@ -1245,6 +1252,18 @@ public Message get(String messageId) {
}
}

@Override
public List<Message> getAllMessages() {
Map<String, String> allMsgs = nonQuorumConn.hgetAll(messageStoreKey);
List<Message> retList = new ArrayList<>();
for (Map.Entry<String,String> entry: allMsgs.entrySet()) {
Message msg = new Message(entry.getKey(), entry.getValue());
retList.add(msg);
}

return retList;
}

@Override
public Message localGet(String messageId) {

Expand Down Expand Up @@ -1330,6 +1349,7 @@ public void clear() {
@Override
public void processUnacks() {

logger.info("processUnacks() will NOT be atomic.");
Stopwatch sw = monitor.processUnack.start();
try {

Expand Down Expand Up @@ -1383,6 +1403,94 @@ public void processUnacks() {

}

@Override
public void atomicProcessUnacks() {

logger.info("processUnacks() will be atomic.");
Stopwatch sw = monitor.processUnack.start();
try {

long queueDepth = size();
monitor.queueDepth.record(queueDepth);

String keyName = getUnackKey(queueName, shardName);
execute("processUnacks", keyName, () -> {

int batchSize = 1_000;
String unackShardName = getUnackKey(queueName, shardName);

double now = Long.valueOf(clock.millis()).doubleValue();
long num_moved_back = 0;
long num_stale = 0;

Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize);

if (unacks.size() > 0) {
logger.info("processUnacks: Attempting to add " + unacks.size() + " messages back to shard of queue: " + unackShardName);
} else {
return null;
}

String atomicProcessUnacksScript = "local hkey=KEYS[1]\n" +
"local unack_shard=ARGV[1]\n" +
"local queue_shard=ARGV[2]\n" +
"local num_unacks=ARGV[3]\n" +
"\n" +
"local unacks={}\n" +
"local unack_scores={}\n" +
"local unack_start_idx = 4\n" +
"for i=0,num_unacks-1 do\n" +
" unacks[i]=ARGV[4 + (i*2)]\n" +
" unack_scores[i]=ARGV[4+(i*2)+1]\n" +
"end\n" +
"\n" +
"local num_moved=0\n" +
"local num_stale=0\n" +
"for i=0,num_unacks-1 do\n" +
" local mem_val = redis.call('hget', hkey, unacks[i])\n" +
" if (mem_val) then\n" +
" redis.call('zadd', queue_shard, unack_scores[i], unacks[i])\n" +
" redis.call('zrem', unack_shard, unacks[i])\n" +
" num_moved=num_moved+1\n" +
" else\n" +
" redis.call('zrem', unack_shard, unacks[i])\n" +
" num_stale=num_stale+1\n" +
" end\n" +
"end\n" +
"\n" +
"return {num_moved, num_stale}\n";

ImmutableList.Builder builder = ImmutableList.builder();
builder.add(unackShardName);
builder.add(localQueueShard);
builder.add(Integer.toString(unacks.size()));
for (Tuple unack : unacks) {
builder.add(unack.getElement());

// The script requires the scores as whole numbers
NumberFormat fmt = NumberFormat.getIntegerInstance();
fmt.setGroupingUsed(false);
String unackScoreString = fmt.format(unack.getScore());
builder.add(unackScoreString);
}

ArrayList<Long> retval = (ArrayList) ((DynoJedisClient)quorumConn).eval(atomicProcessUnacksScript, Collections.singletonList(messageStoreKey), builder.build());
num_moved_back = retval.get(0).longValue();
num_stale = retval.get(1).longValue();
if (num_moved_back > 0 || num_stale > 0) {
logger.info("processUnacks: Moved back " + num_moved_back + " items. Got rid of " + num_stale + " stale items.");
}
return null;
});

} catch (Exception e) {
logger.error("Error while processing unacks. " + e.getMessage());
} finally {
sw.stop();
}

}

private String getQueueShardKey(String queueName, String shard) {
return redisKeyPrefix + ".QUEUE." + queueName + "." + shard;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.dyno.queues.redis;

import com.netflix.dyno.jedis.DynoJedisClient;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.ShardSupplier;
import com.netflix.dyno.queues.redis.sharding.RoundRobinStrategy;
Expand Down Expand Up @@ -56,6 +57,8 @@ public class RedisQueues implements Closeable {

private final ShardingStrategy shardingStrategy;

private final boolean singleRingTopology;

/**
* @param quorumConn Dyno connection with dc_quorum enabled
* @param nonQuorumConn Dyno connection to local Redis
Expand Down Expand Up @@ -103,6 +106,7 @@ public RedisQueues(Clock clock, JedisCommands quorumConn, JedisCommands nonQuoru
this.unackHandlerIntervalInMS = unackHandlerIntervalInMS;
this.queues = new ConcurrentHashMap<>();
this.shardingStrategy = shardingStrategy;
this.singleRingTopology = ((DynoJedisClient) quorumConn).getConnPool().getPools().size() == 3;
}

/**
Expand All @@ -116,7 +120,7 @@ public DynoQueue get(String queueName) {

String key = queueName.intern();

return queues.computeIfAbsent(key, (keyToCompute) -> new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS, shardingStrategy)
return queues.computeIfAbsent(key, (keyToCompute) -> new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS, shardingStrategy, singleRingTopology)
.withUnackTime(unackTime)
.withNonQuorumConn(nonQuorumConn)
.withQuorumConn(quorumConn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,23 @@ public void close() throws IOException {
}
}

@Override
public List<Message> getAllMessages() {
throw new UnsupportedOperationException();
}

@Override
public void processUnacks() {
for (RedisPipelineQueue queue : queues.values()) {
queue.processUnacks();
}
}

@Override
public void atomicProcessUnacks() {
throw new UnsupportedOperationException();
}

private AtomicInteger nextShardIndex = new AtomicInteger(0);

private String getNextShard() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,16 @@ private void processUnacks(String unackShardKey) {

}

@Override
public List<Message> getAllMessages() {
throw new UnsupportedOperationException();
}

@Override
public void atomicProcessUnacks() {
throw new UnsupportedOperationException();
}

@Override
public void close() throws IOException {
schedulerForUnacksProcessing.shutdown();
Expand Down

0 comments on commit 12d242a

Please sign in to comment.