Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker-impl")
public class ProducerSyncRetryTest extends ProducerConsumerBase {

@Override
@BeforeMethod
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@Override
@AfterMethod(alwaysRun = true)
public void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 30000)
public void testProducerSyncRetryAfterTimeout() throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.sendTimeout(1, TimeUnit.MILLISECONDS) // force timeout
.create();

// To make sure first message is timed out
this.stopBroker();

// First message will get timed out, then be retried with same payload
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageMetadata messageMetadata = new MessageMetadata();
messageMetadata.setUncompressedSize(1);
MessageImpl<byte[]> message = MessageImpl.create(messageMetadata, payload, Schema.BYTES, topic);

MessageMetadata retryMessageMetadata = new MessageMetadata();
retryMessageMetadata.setUncompressedSize(1);
MessageImpl<byte[]> retryMessage = MessageImpl.create(retryMessageMetadata, payload, Schema.BYTES, topic);

// First send is expected to fail
CompletableFuture<MessageId> firstSend = producer.sendAsync(message);
producer.triggerSendTimer();

// Waits until firstSend returns timeout exception
CompletableFuture<MessageId> retrySend =
firstSend.handle((msgId, ex) -> {
assertNotNull(ex, "First send must timeout");
assertTrue(ex instanceof PulsarClientException.TimeoutException);
try {
// Retry should succeed
this.startBroker();
} catch (Exception e) {
throw new RuntimeException(e);
}
producer.conf.setSendTimeoutMs(10000);
return producer.sendAsync(retryMessage);
}).thenCompose(f -> f);

// Wait until retry completes successfully
MessageId retryMessageId = retrySend.join();
assertNotNull(retryMessageId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1820,11 +1820,12 @@ public void clear() {
messagesCount.set(0);
}

public void remove() {
public OpSendMsg remove() {
OpSendMsg op = delegate.remove();
if (op != null) {
messagesCount.addAndGet(-op.numMessagesInBatch);
}
return op;
}

public OpSendMsg peek() {
Expand Down Expand Up @@ -2295,14 +2296,20 @@ public void run(Timeout timeout) throws Exception {
}

/**
* This fails and clears the pending messages with the given exception. This method should be called from within the
* ProducerImpl object mutex.
* This fails the pending messages at the start of the call, without dropping newly enqueued
* retry messages. This method should be called from within the ProducerImpl object mutex.
*/
private synchronized void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
@VisibleForTesting
synchronized void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
if (cnx == null) {
final AtomicInteger releaseCount = new AtomicInteger();
final boolean batchMessagingEnabled = isBatchMessagingEnabled();
pendingMessages.forEach(op -> {
// Track message count to fail so that newly added messages by synchronous retries
// triggered by op.sendComplete(ex); don't get removed
int pendingMessagesToFailCount = pendingMessages.size();

for (int i = 0; i < pendingMessagesToFailCount; i++) {
OpSendMsg op = pendingMessages.remove();
releaseCount.addAndGet(batchMessagingEnabled ? op.numMessagesInBatch : 1);
try {
// Need to protect ourselves from any exception being thrown in the future handler from the
Expand All @@ -2322,9 +2329,8 @@ private synchronized void failPendingMessages(ClientCnx cnx, PulsarClientExcepti
client.getMemoryLimitController().releaseMemory(op.uncompressedSize);
ReferenceCountUtil.safeRelease(op.cmd);
op.recycle();
});
}

pendingMessages.clear();
semaphoreRelease(releaseCount.get());
if (batchMessagingEnabled) {
failPendingBatchMessages(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.mockito.Mockito;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -67,4 +70,78 @@ public void testPopulateMessageSchema() {
assertTrue(producer.populateMessageSchema(msg, null));
verify(msg).setSchemaState(MessageImpl.SchemaState.Ready);
}

@Test
public void testFailPendingMessagesSyncRetry()
throws Exception {
ProducerImpl<byte[]> producer =
Mockito.mock(ProducerImpl.class, Mockito.CALLS_REAL_METHODS);
// Disable batching
Mockito.doReturn(false)
.when(producer)
.isBatchMessagingEnabled();

// Stub semaphore release (not under test)
Mockito.doNothing()
.when(producer)
.semaphoreRelease(Mockito.anyInt());

// Stub client cleanup path (not under test)
PulsarClientImpl client = Mockito.mock(PulsarClientImpl.class);
Mockito.when(client.getMemoryLimitController())
.thenReturn(Mockito.mock(MemoryLimitController.class));
FieldUtils.writeField(producer, "client", client, true);

// Real pending queue
ProducerImpl.OpSendMsgQueue pendingQueue = new ProducerImpl.OpSendMsgQueue();
FieldUtils.writeField(producer, "pendingMessages", pendingQueue, true);

// OpSendMsg that retries reentrantly
MessageImpl<?> msg = Mockito.mock(MessageImpl.class);
Mockito.when(msg.getUncompressedSize()).thenReturn(10);
ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(
Mockito.mock(LatencyHistogram.class),
msg,
Mockito.mock(ByteBufPair.class),
1L,
Mockito.mock(SendCallback.class)
);
op.totalChunks = 1;
op.chunkId = 0;
op.numMessagesInBatch = 1;

MessageImpl<?> retryMsg = Mockito.mock(MessageImpl.class);
Mockito.when(retryMsg.getUncompressedSize()).thenReturn(10);

// Override sendComplete to Reentrant retry via spy
ProducerImpl.OpSendMsg firstSpy = Mockito.spy(op);
Mockito.doAnswer(invocation -> {
// Reentrant retry during callback
ProducerImpl.OpSendMsg retryOp = ProducerImpl.OpSendMsg.create(
Mockito.mock(LatencyHistogram.class),
retryMsg,
Mockito.mock(ByteBufPair.class),
2L,
Mockito.mock(SendCallback.class)
);
retryOp.totalChunks = 1;
retryOp.chunkId = 0;
retryOp.numMessagesInBatch = 1;
pendingQueue.add(retryOp);
return null;
}).when(firstSpy).sendComplete(Mockito.any());
Mockito.doNothing()
.when(firstSpy)
.recycle();

// Seed initial pending message
pendingQueue.add(firstSpy);

// Invoke failPendingMessages(null, ex)
producer.failPendingMessages(null, new PulsarClientException.TimeoutException("timeout"));
assertEquals(producer.getPendingQueueSize(), 1,
"Retry Op should exist in the pending Queue");
assertEquals(pendingQueue.peek().sequenceId, 2L,
"Retry Op SequenceId should match with the one in pendingQueue");
}
}
Loading