Skip to content

Commit

Permalink
[fix] [broker] Let Pending ack handler can retry to init when encount…
Browse files Browse the repository at this point in the history
…ers a metadata store error (apache#23153)

(cherry picked from commit 2dde403)
(cherry picked from commit d406193)
  • Loading branch information
poorbarcode authored and srinath-ctds committed Aug 20, 2024
1 parent b3d47d2 commit 68db449
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;

/**
Expand Down Expand Up @@ -990,7 +992,9 @@ private static boolean isRetryableException(Throwable ex) {
&& !(realCause instanceof ManagedLedgerException.NonRecoverableLedgerException))
|| realCause instanceof PulsarClientException.BrokerPersistenceException
|| realCause instanceof PulsarClientException.LookupException
|| realCause instanceof PulsarClientException.ConnectException;
|| realCause instanceof PulsarClientException.ConnectException
|| realCause instanceof MetadataStoreException
|| realCause instanceof BKException;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
Expand Down Expand Up @@ -78,10 +80,12 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
Expand All @@ -105,16 +109,26 @@ protected void cleanup() {
super.internalCleanup();
}


@DataProvider(name = "retryableErrors")
public Object[][] retryableErrors() {
return new Object[][] {
{new ManagedLedgerException("mock retryable error")},
{new MetadataStoreException("mock retryable error")},
{new BKException(-1)},
};
}

/**
* Test consumer can be built successfully with retryable exception
* and get correct error with no-retryable exception.
* @throws Exception
*/
@Test(timeOut = 60000)
public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
@Test(timeOut = 60000, dataProvider = "retryableErrors")
public void testBuildConsumerEncounterPendingAckInitFailure(Exception retryableError) throws Exception {
// 1. Prepare and make sure the consumer can be built successfully.
String topic = NAMESPACE1 + "/testUnloadSubscriptionWhenFailedInitPendingAck";
@Cleanup
String topic = BrokerTestUtil.newUniqueName(NAMESPACE1 + "/tp");
admin.topics().createNonPartitionedTopic(topic);
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.subscriptionName("subName1")
.topic(topic)
Expand All @@ -132,11 +146,10 @@ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
// The consumer will be built successfully after one time retry.
when(mockProvider.checkInitializedBefore(any()))
// First, the method checkInitializedBefore will fail with a retryable exception.
.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail initialize")))
.thenReturn(FutureUtil.failedFuture(retryableError))
// Then, the method will be executed successfully.
.thenReturn(CompletableFuture.completedFuture(false));
transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider);
@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.subscriptionName("subName2")
.topic(topic)
Expand All @@ -153,7 +166,6 @@ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
// Then, the method will be executed successfully.
.thenCallRealMethod();
transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider);
@Cleanup
Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
.subscriptionName("subName3")
.topic(topic)
Expand All @@ -166,7 +178,7 @@ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException
.NonRecoverableLedgerException("mock fail")))
.thenReturn(CompletableFuture.completedFuture(false));
@Cleanup PulsarClient pulsarClient = PulsarClient.builder()
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
.operationTimeout(3, TimeUnit.SECONDS)
.build();
Expand All @@ -180,6 +192,13 @@ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
} catch (Exception exception) {
assertTrue(exception.getMessage().contains("Failed to init transaction pending ack."));
}

// cleanup.
consumer1.close();
consumer2.close();
consumer3.close();
pulsarClient.close();
admin.topics().delete(topic, false);
}

@Test
Expand Down

0 comments on commit 68db449

Please sign in to comment.