From 12d242ab3fe431ffa9bf2f6a88e3a209b0d269a2 Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Fri, 17 Jan 2020 14:50:41 -0800 Subject: [PATCH] Add atomicProcessUnacks() and getAllMessages() --- .../com/netflix/dyno/queues/DynoQueue.java | 10 ++ .../dyno/queues/redis/RedisDynoQueue.java | 120 +++++++++++++++++- .../dyno/queues/redis/RedisQueues.java | 6 +- .../dyno/queues/redis/v2/MultiRedisQueue.java | 10 ++ .../queues/redis/v2/RedisPipelineQueue.java | 10 ++ 5 files changed, 149 insertions(+), 7 deletions(-) diff --git a/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java b/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java index ce448e7..1db74f8 100644 --- a/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java +++ b/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java @@ -188,6 +188,15 @@ public interface DynoQueue extends Closeable { */ public Message get(String messageId); + /** + * + * Attempts to return all the messages found in the hashmap. It's a best-effort return of all payloads, i.e. it may + * not 100% match with what's in the queue metadata at any given time and is read with a non-quorum connection. + * + * @return Returns a list of all messages found in the message hashmap. + */ + public List getAllMessages(); + /** * * Same as get(), but uses the non quorum connection. @@ -222,6 +231,7 @@ public interface DynoQueue extends Closeable { * Process un-acknowledged messages. The messages which are polled by the client but not ack'ed are moved back to queue */ public void processUnacks(); + public void atomicProcessUnacks(); /* * <=== Begin unsafe* functions. ===> diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java index fb0825e..fca9c96 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java @@ -87,6 +87,8 @@ public class RedisDynoQueue implements DynoQueue { private final ShardingStrategy shardingStrategy; + private final boolean singleRingTopology; + // Tracks the number of message IDs to prefetch based on the message counts requested by the caller via pop(). @VisibleForTesting AtomicInteger numIdsToPrefetch; @@ -96,15 +98,15 @@ public class RedisDynoQueue implements DynoQueue { @VisibleForTesting AtomicInteger unsafeNumIdsToPrefetchAllShards; - public RedisDynoQueue(String redisKeyPrefix, String queueName, Set allShards, String shardName, ShardingStrategy shardingStrategy) { - this(redisKeyPrefix, queueName, allShards, shardName, 60_000, shardingStrategy); + public RedisDynoQueue(String redisKeyPrefix, String queueName, Set allShards, String shardName, ShardingStrategy shardingStrategy, boolean singleRingTopology) { + this(redisKeyPrefix, queueName, allShards, shardName, 60_000, shardingStrategy, singleRingTopology); } - public RedisDynoQueue(String redisKeyPrefix, String queueName, Set allShards, String shardName, int unackScheduleInMS, ShardingStrategy shardingStrategy) { - this(Clock.systemDefaultZone(), redisKeyPrefix, queueName, allShards, shardName, unackScheduleInMS, shardingStrategy); + public RedisDynoQueue(String redisKeyPrefix, String queueName, Set allShards, String shardName, int unackScheduleInMS, ShardingStrategy shardingStrategy, boolean singleRingTopology) { + this(Clock.systemDefaultZone(), redisKeyPrefix, queueName, allShards, shardName, unackScheduleInMS, shardingStrategy, singleRingTopology); } - public RedisDynoQueue(Clock clock, String redisKeyPrefix, String queueName, Set allShards, String shardName, int unackScheduleInMS, ShardingStrategy shardingStrategy) { + public RedisDynoQueue(Clock clock, String redisKeyPrefix, String queueName, Set allShards, String shardName, int unackScheduleInMS, ShardingStrategy shardingStrategy, boolean singleRingTopology) { this.clock = clock; this.redisKeyPrefix = redisKeyPrefix; this.queueName = queueName; @@ -116,6 +118,7 @@ public RedisDynoQueue(Clock clock, String redisKeyPrefix, String queueName, Set< this.numIdsToPrefetch = new AtomicInteger(0); this.unsafeNumIdsToPrefetchAllShards = new AtomicInteger(0); + this.singleRingTopology = singleRingTopology; this.om = QueueUtils.constructObjectMapper(); this.monitor = new QueueMonitor(queueName, shardName); @@ -127,7 +130,11 @@ public RedisDynoQueue(Clock clock, String redisKeyPrefix, String queueName, Set< schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1); - schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS); + if (this.singleRingTopology) { + schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS); + } else { + schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS); + } logger.info(RedisDynoQueue.class.getName() + " is ready to serve " + queueName); } @@ -1245,6 +1252,18 @@ public Message get(String messageId) { } } + @Override + public List getAllMessages() { + Map allMsgs = nonQuorumConn.hgetAll(messageStoreKey); + List retList = new ArrayList<>(); + for (Map.Entry entry: allMsgs.entrySet()) { + Message msg = new Message(entry.getKey(), entry.getValue()); + retList.add(msg); + } + + return retList; + } + @Override public Message localGet(String messageId) { @@ -1330,6 +1349,7 @@ public void clear() { @Override public void processUnacks() { + logger.info("processUnacks() will NOT be atomic."); Stopwatch sw = monitor.processUnack.start(); try { @@ -1383,6 +1403,94 @@ public void processUnacks() { } + @Override + public void atomicProcessUnacks() { + + logger.info("processUnacks() will be atomic."); + Stopwatch sw = monitor.processUnack.start(); + try { + + long queueDepth = size(); + monitor.queueDepth.record(queueDepth); + + String keyName = getUnackKey(queueName, shardName); + execute("processUnacks", keyName, () -> { + + int batchSize = 1_000; + String unackShardName = getUnackKey(queueName, shardName); + + double now = Long.valueOf(clock.millis()).doubleValue(); + long num_moved_back = 0; + long num_stale = 0; + + Set unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize); + + if (unacks.size() > 0) { + logger.info("processUnacks: Attempting to add " + unacks.size() + " messages back to shard of queue: " + unackShardName); + } else { + return null; + } + + String atomicProcessUnacksScript = "local hkey=KEYS[1]\n" + + "local unack_shard=ARGV[1]\n" + + "local queue_shard=ARGV[2]\n" + + "local num_unacks=ARGV[3]\n" + + "\n" + + "local unacks={}\n" + + "local unack_scores={}\n" + + "local unack_start_idx = 4\n" + + "for i=0,num_unacks-1 do\n" + + " unacks[i]=ARGV[4 + (i*2)]\n" + + " unack_scores[i]=ARGV[4+(i*2)+1]\n" + + "end\n" + + "\n" + + "local num_moved=0\n" + + "local num_stale=0\n" + + "for i=0,num_unacks-1 do\n" + + " local mem_val = redis.call('hget', hkey, unacks[i])\n" + + " if (mem_val) then\n" + + " redis.call('zadd', queue_shard, unack_scores[i], unacks[i])\n" + + " redis.call('zrem', unack_shard, unacks[i])\n" + + " num_moved=num_moved+1\n" + + " else\n" + + " redis.call('zrem', unack_shard, unacks[i])\n" + + " num_stale=num_stale+1\n" + + " end\n" + + "end\n" + + "\n" + + "return {num_moved, num_stale}\n"; + + ImmutableList.Builder builder = ImmutableList.builder(); + builder.add(unackShardName); + builder.add(localQueueShard); + builder.add(Integer.toString(unacks.size())); + for (Tuple unack : unacks) { + builder.add(unack.getElement()); + + // The script requires the scores as whole numbers + NumberFormat fmt = NumberFormat.getIntegerInstance(); + fmt.setGroupingUsed(false); + String unackScoreString = fmt.format(unack.getScore()); + builder.add(unackScoreString); + } + + ArrayList retval = (ArrayList) ((DynoJedisClient)quorumConn).eval(atomicProcessUnacksScript, Collections.singletonList(messageStoreKey), builder.build()); + num_moved_back = retval.get(0).longValue(); + num_stale = retval.get(1).longValue(); + if (num_moved_back > 0 || num_stale > 0) { + logger.info("processUnacks: Moved back " + num_moved_back + " items. Got rid of " + num_stale + " stale items."); + } + return null; + }); + + } catch (Exception e) { + logger.error("Error while processing unacks. " + e.getMessage()); + } finally { + sw.stop(); + } + + } + private String getQueueShardKey(String queueName, String shard) { return redisKeyPrefix + ".QUEUE." + queueName + "." + shard; } diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueues.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueues.java index 3cc2b93..08567f7 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueues.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueues.java @@ -15,6 +15,7 @@ */ package com.netflix.dyno.queues.redis; +import com.netflix.dyno.jedis.DynoJedisClient; import com.netflix.dyno.queues.DynoQueue; import com.netflix.dyno.queues.ShardSupplier; import com.netflix.dyno.queues.redis.sharding.RoundRobinStrategy; @@ -56,6 +57,8 @@ public class RedisQueues implements Closeable { private final ShardingStrategy shardingStrategy; + private final boolean singleRingTopology; + /** * @param quorumConn Dyno connection with dc_quorum enabled * @param nonQuorumConn Dyno connection to local Redis @@ -103,6 +106,7 @@ public RedisQueues(Clock clock, JedisCommands quorumConn, JedisCommands nonQuoru this.unackHandlerIntervalInMS = unackHandlerIntervalInMS; this.queues = new ConcurrentHashMap<>(); this.shardingStrategy = shardingStrategy; + this.singleRingTopology = ((DynoJedisClient) quorumConn).getConnPool().getPools().size() == 3; } /** @@ -116,7 +120,7 @@ public DynoQueue get(String queueName) { String key = queueName.intern(); - return queues.computeIfAbsent(key, (keyToCompute) -> new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS, shardingStrategy) + return queues.computeIfAbsent(key, (keyToCompute) -> new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS, shardingStrategy, singleRingTopology) .withUnackTime(unackTime) .withNonQuorumConn(nonQuorumConn) .withQuorumConn(quorumConn)); diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/MultiRedisQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/MultiRedisQueue.java index c94dc46..93e84d9 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/MultiRedisQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/MultiRedisQueue.java @@ -242,6 +242,11 @@ public void close() throws IOException { } } + @Override + public List getAllMessages() { + throw new UnsupportedOperationException(); + } + @Override public void processUnacks() { for (RedisPipelineQueue queue : queues.values()) { @@ -249,6 +254,11 @@ public void processUnacks() { } } + @Override + public void atomicProcessUnacks() { + throw new UnsupportedOperationException(); + } + private AtomicInteger nextShardIndex = new AtomicInteger(0); private String getNextShard() { diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/RedisPipelineQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/RedisPipelineQueue.java index 903c426..7235b37 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/RedisPipelineQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/RedisPipelineQueue.java @@ -679,6 +679,16 @@ private void processUnacks(String unackShardKey) { } + @Override + public List getAllMessages() { + throw new UnsupportedOperationException(); + } + + @Override + public void atomicProcessUnacks() { + throw new UnsupportedOperationException(); + } + @Override public void close() throws IOException { schedulerForUnacksProcessing.shutdown();