Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix getMessageById throws 500 #21919

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[fix][broker] Fix getMessageById throws 500
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Jan 18, 2024
commit 3efd4a65f6c2c89afb1a5a1765789e5ffee651bb
Original file line number Diff line number Diff line change
Expand Up @@ -2826,6 +2826,9 @@ protected CompletableFuture<Response> internalGetMessageById(long ledgerId, long
@Override
public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
if (exception instanceof ManagedLedgerException.LedgerNotExistException) {
throw new RestException(Status.NOT_FOUND, "Message id not found");
}
throw new RestException(exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1364,21 +1364,12 @@ public void testGetMessageById() throws Exception {
Message<byte[]> message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId());
Assert.assertEquals(message2.getData(), data2.getBytes());

Message<byte[]> message3 = null;
try {
message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
Assert.fail();
} catch (Exception e) {
Assert.assertNull(message3);
}

Message<byte[]> message4 = null;
try {
message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
Assert.fail();
} catch (Exception e) {
Assert.assertNull(message4);
}
Assert.expectThrows(PulsarAdminException.NotFoundException.class, () -> {
admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
});
Assert.expectThrows(PulsarAdminException.NotFoundException.class, () -> {
admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,21 +987,7 @@ public CompletableFuture<Void> truncateAsync(String topic) {

@Override
public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId) {
CompletableFuture<Message<byte[]>> future = new CompletableFuture<>();
getRemoteMessageById(topic, ledgerId, entryId).handle((r, ex) -> {
if (ex != null) {
if (ex instanceof NotFoundException) {
log.warn("Exception '{}' occurred while trying to get message.", ex.getMessage());
future.complete(r);
} else {
future.completeExceptionally(ex);
}
return null;
}
future.complete(r);
return null;
});
return future;
return getRemoteMessageById(topic, ledgerId, entryId);
nodece marked this conversation as resolved.
Show resolved Hide resolved
}

private CompletableFuture<Message<byte[]>> getRemoteMessageById(String topic, long ledgerId, long entryId) {
Expand Down
Loading