Skip to content

Commit

Permalink
Make findStaleMessages() return stale messages from all shards
Browse files Browse the repository at this point in the history
Note: All items returned MUST be checked at the app level if they've
already been processed before acting on them (eg: removing them)
  • Loading branch information
smukil committed Jan 25, 2020
1 parent 5b25b69 commit 03af6f5
Showing 1 changed file with 48 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1467,60 +1467,64 @@ public List<Message> findStaleMessages() {

List<Message> stale_msgs = new ArrayList<>();

int batchSize = 1_000;
int batchSize = 10;

double now = Long.valueOf(clock.millis()).doubleValue();
long num_stale = 0;

Set<String> elems = nonQuorumConn.zrangeByScore(localQueueShard, 0, now, 0, batchSize);
for (String shard : allShards) {
String queueShardName = getQueueShardKey(queueName, shard);
Set<String> elems = nonQuorumConn.zrangeByScore(queueShardName, 0, now, 0, batchSize);

if (elems.size() == 0) {
return stale_msgs;
}
if (elems.size() == 0) {
continue;
}

String findStaleMsgsScript = "local hkey=KEYS[1]\n" +
"local queue_shard=ARGV[1]\n" +
"local unack_shard=ARGV[2]\n" +
"local num_msgs=ARGV[3]\n" +
"\n" +
"local stale_msgs={}\n" +
"local num_stale_idx = 1\n" +
"for i=0,num_msgs-1 do\n" +
" local msg_id=ARGV[4+i]\n" +
"\n" +
" local exists_hash = redis.call('hget', hkey, msg_id)\n" +
" local exists_queue = redis.call('zscore', queue_shard, msg_id)\n" +
" local exists_unack = redis.call('zscore', unack_shard, msg_id)\n" +
"\n" +
" if (exists_hash and exists_queue) then\n" +
" elseif (not (exists_unack)) then\n" +
" stale_msgs[num_stale_idx] = msg_id\n" +
" num_stale_idx = num_stale_idx + 1\n" +
" end\n" +
"end\n" +
"\n" +
"return stale_msgs\n";
String findStaleMsgsScript = "local hkey=KEYS[1]\n" +
"local queue_shard=ARGV[1]\n" +
"local unack_shard=ARGV[2]\n" +
"local num_msgs=ARGV[3]\n" +
"\n" +
"local stale_msgs={}\n" +
"local num_stale_idx = 1\n" +
"for i=0,num_msgs-1 do\n" +
" local msg_id=ARGV[4+i]\n" +
"\n" +
" local exists_hash = redis.call('hget', hkey, msg_id)\n" +
" local exists_queue = redis.call('zscore', queue_shard, msg_id)\n" +
" local exists_unack = redis.call('zscore', unack_shard, msg_id)\n" +
"\n" +
" if (exists_hash and exists_queue) then\n" +
" elseif (not (exists_unack)) then\n" +
" stale_msgs[num_stale_idx] = msg_id\n" +
" num_stale_idx = num_stale_idx + 1\n" +
" end\n" +
"end\n" +
"\n" +
"return stale_msgs\n";

String unackKey = getUnackKey(queueName, shardName);
ImmutableList.Builder builder = ImmutableList.builder();
builder.add(localQueueShard);
builder.add(unackKey);
builder.add(Integer.toString(elems.size()));
for (String msg : elems) {
builder.add(msg);
}
String unackKey = getUnackKey(queueName, shard);
ImmutableList.Builder builder = ImmutableList.builder();
builder.add(queueShardName);
builder.add(unackKey);
builder.add(Integer.toString(elems.size()));
for (String msg : elems) {
builder.add(msg);
}

ArrayList<String> stale_msg_ids = (ArrayList) ((DynoJedisClient)quorumConn).eval(findStaleMsgsScript, Collections.singletonList(messageStoreKey), builder.build());
num_stale = stale_msg_ids.size();
if (num_stale > 0) {
logger.info("findStaleMsgs(): Found " + num_stale + " messages present in queue but not in hashmap");
}
ArrayList<String> stale_msg_ids = (ArrayList) ((DynoJedisClient)quorumConn).eval(findStaleMsgsScript, Collections.singletonList(messageStoreKey), builder.build());
num_stale = stale_msg_ids.size();
if (num_stale > 0) {
logger.info("findStaleMsgs(): Found " + num_stale + " messages present in queue but not in hashmap");
}

for (String m : stale_msg_ids) {
Message msg = new Message();
msg.setId(m);
stale_msgs.add(msg);
for (String m : stale_msg_ids) {
Message msg = new Message();
msg.setId(m);
stale_msgs.add(msg);
}
}

return stale_msgs;
});
}
Expand Down

0 comments on commit 03af6f5

Please sign in to comment.