Skip to content

Commit

Permalink
Add a atomic bulkPop() API
Browse files Browse the repository at this point in the history
Using queues with DC_EACH_SAFE_QUORUM is quite expensive. The bulk
pop operation is meant to pop more within  a single round trip.
  • Loading branch information
smukil committed Nov 11, 2019
1 parent 749596c commit d4305d4
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ public interface DynoQueue extends Closeable {
*/
public Message get(String messageId);

public List<Message> bulkPop(int messageCount, int wait, TimeUnit unit);

/**
*
* @return Size of the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,100 @@ public String getMsgWithPredicate(String predicate, boolean localShardOnly) {
});
}

@Override
public List<Message> bulkPop(int messageCount, int wait, TimeUnit unit) {

if (messageCount < 1) {
return Collections.emptyList();
}

Stopwatch sw = monitor.start(monitor.pop, messageCount);
try {
long start = clock.millis();
long waitFor = unit.toMillis(wait);
numIdsToPrefetch.addAndGet(messageCount);

prefetchIds();
while (prefetchedIds.size() < messageCount && ((clock.millis() - start) < waitFor)) {
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
prefetchIds();
}
return atomicBulkPopHelper(shardName, messageCount, prefetchedIds);

} catch (Exception e) {
throw new RuntimeException(e);
} finally {
sw.stop();
}

}

private List<Message> atomicBulkPopHelper(String shard, int messageCount,
ConcurrentLinkedQueue<String> prefetchedIdQueue) {

double now = Long.valueOf(clock.millis() + 1).doubleValue();
double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();

// The script requires the scores as whole numbers
NumberFormat fmt = NumberFormat.getIntegerInstance();
fmt.setGroupingUsed(false);
String nowScoreString = fmt.format(now);
String unackScoreString = fmt.format(unackScore);

List<String> messageIds = new ArrayList<>();
for (int i = 0; i < messageCount; ++i) {
messageIds.add(prefetchedIdQueue.poll());
}

String atomicBulkPopScript="local hkey=KEYS[1]\n" +
"local num_msgs=ARGV[1]\n" +
"local peek_until=ARGV[2]\n" +
"local unack_score=ARGV[3]\n" +
"local queue_shard_name=ARGV[4]\n" +
"local unack_shard_name=ARGV[5]\n" +
"local msg_start_idx = 6\n" +
"local idx = 1\n" +
"local return_vals={}\n" +
"for i=0,num_msgs-1 do\n" +
" local message_id=ARGV[msg_start_idx + i]\n" +
" local exists = redis.call('zscore', queue_shard_name, message_id)\n" +
" if (exists) then\n" +
" if (exists <=peek_until) then\n" +
" local value = redis.call('hget', hkey, message_id)\n" +
" if (value) then\n" +
" local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" +
" if (zadd_ret) then\n" +
" redis.call('zrem', queue_shard_name, message_id)\n" +
" return_vals[idx]=value\n" +
" idx=idx+1\n" +
" end\n" +
" end\n" +
" end\n" +
" else\n" +
" return {}\n" +
" end\n" +
"end\n" +
"return return_vals";

String unackShardName = getUnackKey(queueName, shardName);

ImmutableList.Builder builder = ImmutableList.builder();
builder.add(Integer.toString(messageCount));
builder.add(nowScoreString);
builder.add(unackScoreString);
builder.add(localQueueShard);
builder.add(unackShardName);
for (int i = 0; i < messageCount; ++i) {
builder.add(messageIds.get(i));
}

List<Message> payloads;
// Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'.
payloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScript,
Collections.singletonList(messageStoreKey), builder.build());

return payloads;
}
/**
*
* Similar to popWithMsgId() but completes all the operations in one round trip.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ public Message popMsgWithPredicate(String predicate, boolean localShardOnly) {
throw new UnsupportedOperationException();
}

@Override
public List<Message> bulkPop(int messageCount, int wait, TimeUnit unit) {
throw new UnsupportedOperationException();
}

@Override
public Message get(String messageId) {
for (DynoQueue q : queues.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,11 @@ public Message popMsgWithPredicate(String predicate, boolean localShardOnly) {
throw new UnsupportedOperationException();
}

@Override
public List<Message> bulkPop(int messageCount, int wait, TimeUnit unit) {
throw new UnsupportedOperationException();
}

@Override
public Message get(String messageId) {

Expand Down

0 comments on commit d4305d4

Please sign in to comment.