Skip to content

Commit

Permalink
Fix race condition in consumer redelivery (#14687)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawyeok authored Mar 15, 2022
1 parent a7786e1 commit dd9bcbe
Showing 1 changed file with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1916,20 +1916,20 @@ private CompletableFuture<List<MessageIdData>> getRedeliveryMessageIdData(List<M
if (messageIds == null || messageIds.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
List<MessageIdData> data = new ArrayList<>(messageIds.size());
List<CompletableFuture<Void>> futures = new ArrayList<>(messageIds.size());
messageIds.forEach(messageId -> {
List<CompletableFuture<MessageIdData>> futures = messageIds.stream().map(messageId -> {
CompletableFuture<Boolean> future = processPossibleToDLQ(messageId);
futures.add(future.thenAccept(sendToDLQ -> {
return future.thenApply(sendToDLQ -> {
if (!sendToDLQ) {
data.add(new MessageIdData()
return new MessageIdData()
.setPartition(messageId.getPartitionIndex())
.setLedgerId(messageId.getLedgerId())
.setEntryId(messageId.getEntryId()));
.setEntryId(messageId.getEntryId());
}
}));
});
return FutureUtil.waitForAll(futures).thenCompose(v -> CompletableFuture.completedFuture(data));
return null;
});
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(v ->
futures.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(Collectors.toList()));
}

private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl messageId) {
Expand Down

0 comments on commit dd9bcbe

Please sign in to comment.