Skip to content

Commit

Permalink
[fix][test]Flaky test testMaxPendingChunkMessages (#21103)
Browse files Browse the repository at this point in the history
  • Loading branch information
liangyepianzhou authored Sep 7, 2023
1 parent d890432 commit 3cb7926
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -319,15 +320,29 @@ private void sendSingleChunk(Producer<String> producer, String uuid, int chunkId
msg.send();
}

/**
* This test used to test the consumer configuration of maxPendingChunkedMessage.
* If we set maxPendingChunkedMessage is 1 that means only one incomplete chunk message can be store in this
* consumer.
* For example:
* ChunkMessage1 chunk-1: uuid = 0, chunkId = 0, totalChunk = 2;
* ChunkMessage2 chunk-1: uuid = 1, chunkId = 0, totalChunk = 2;
* ChunkMessage2 chunk-2: uuid = 1, chunkId = 1, totalChunk = 2;
* ChunkMessage1 chunk-2: uuid = 0, chunkId = 1, totalChunk = 2;
* The chunk-1 in the ChunkMessage1 and ChunkMessage2 all is incomplete.
* chunk-1 in the ChunkMessage1 will be discarded and acked when receive the chunk-1 in the ChunkMessage2.
* If ack ChunkMessage2 and redeliver unacknowledged messages, the consumer can not receive any message again.
* @throws Exception
*/
@Test
public void testMaxPendingChunkMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
final String topicName = "persistent://my-property/my-ns/maxPending";

final String subName = "my-subscriber-name";
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("my-subscriber-name")
.subscriptionName(subName)
.maxPendingChunkedMessage(1)
.autoAckOldestChunkedMessageOnQueueFull(true)
.subscribe();
Expand All @@ -348,6 +363,8 @@ public void testMaxPendingChunkMessages() throws Exception {
assertEquals(receivedMsg.getValue(), "chunk-1-0|chunk-1-1|");

consumer.acknowledge(receivedMsg);
Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName)
.getSubscriptions().get(subName).getNonContiguousDeletedMessagesRanges(), 0));
consumer.redeliverUnacknowledgedMessages();

sendSingleChunk(producer, "0", 1, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1520,9 +1520,10 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
return null;
}
// means we lost the first chunk: should never happen
log.info("[{}] [{}] Received unexpected chunk messageId {}, last-chunk-id = {}, chunkId = {}", topic,
subscription, msgId,
(chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
log.info("[{}] [{}] Received unexpected chunk messageId {}, last-chunk-id = {}, chunkId = {}, uuid = {}",
topic, subscription, msgId,
(chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(),
msgMetadata.getUuid());
if (chunkedMsgCtx != null) {
if (chunkedMsgCtx.chunkedMsgBuffer != null) {
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
Expand Down

0 comments on commit 3cb7926

Please sign in to comment.