Skip to content

Commit

Permalink
[fix] [broker] Let Pending ack handler can retry to init when encount…
Browse files Browse the repository at this point in the history
…ers a metadata store error (apache#23153)
  • Loading branch information
poorbarcode authored Aug 12, 2024
1 parent 38134bc commit 2dde403
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;

/**
Expand Down Expand Up @@ -989,7 +991,9 @@ private static boolean isRetryableException(Throwable ex) {
&& !(realCause instanceof ManagedLedgerException.NonRecoverableLedgerException))
|| realCause instanceof PulsarClientException.BrokerPersistenceException
|| realCause instanceof PulsarClientException.LookupException
|| realCause instanceof PulsarClientException.ConnectException;
|| realCause instanceof PulsarClientException.ConnectException
|| realCause instanceof MetadataStoreException
|| realCause instanceof BKException;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
Expand Down Expand Up @@ -81,11 +83,13 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
Expand All @@ -109,16 +113,26 @@ protected void cleanup() {
super.internalCleanup();
}


@DataProvider(name = "retryableErrors")
public Object[][] retryableErrors() {
return new Object[][] {
{new ManagedLedgerException("mock retryable error")},
{new MetadataStoreException("mock retryable error")},
{new BKException(-1)},
};
}

/**
* Test consumer can be built successfully with retryable exception
* and get correct error with no-retryable exception.
* @throws Exception
*/
@Test(timeOut = 60000)
public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
@Test(timeOut = 60000, dataProvider = "retryableErrors")
public void testBuildConsumerEncounterPendingAckInitFailure(Exception retryableError) throws Exception {
// 1. Prepare and make sure the consumer can be built successfully.
String topic = NAMESPACE1 + "/testUnloadSubscriptionWhenFailedInitPendingAck";
@Cleanup
String topic = BrokerTestUtil.newUniqueName(NAMESPACE1 + "/tp");
admin.topics().createNonPartitionedTopic(topic);
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.subscriptionName("subName1")
.topic(topic)
Expand All @@ -136,11 +150,10 @@ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
// The consumer will be built successfully after one time retry.
when(mockProvider.checkInitializedBefore(any()))
// First, the method checkInitializedBefore will fail with a retryable exception.
.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail initialize")))
.thenReturn(FutureUtil.failedFuture(retryableError))
// Then, the method will be executed successfully.
.thenReturn(CompletableFuture.completedFuture(false));
transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider);
@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.subscriptionName("subName2")
.topic(topic)
Expand All @@ -157,7 +170,6 @@ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
// Then, the method will be executed successfully.
.thenCallRealMethod();
transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider);
@Cleanup
Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
.subscriptionName("subName3")
.topic(topic)
Expand All @@ -170,7 +182,7 @@ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException
.NonRecoverableLedgerException("mock fail")))
.thenReturn(CompletableFuture.completedFuture(false));
@Cleanup PulsarClient pulsarClient = PulsarClient.builder()
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
.operationTimeout(3, TimeUnit.SECONDS)
.build();
Expand All @@ -184,6 +196,13 @@ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
} catch (Exception exception) {
assertTrue(exception.getMessage().contains("Failed to init transaction pending ack."));
}

// cleanup.
consumer1.close();
consumer2.close();
consumer3.close();
pulsarClient.close();
admin.topics().delete(topic, false);
}

@Test
Expand Down

0 comments on commit 2dde403

Please sign in to comment.