Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Let Pending ack handler can retry to init when encounters a metadata store error #23153

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;

/**
Expand Down Expand Up @@ -989,7 +991,9 @@ private static boolean isRetryableException(Throwable ex) {
&& !(realCause instanceof ManagedLedgerException.NonRecoverableLedgerException))
|| realCause instanceof PulsarClientException.BrokerPersistenceException
|| realCause instanceof PulsarClientException.LookupException
|| realCause instanceof PulsarClientException.ConnectException;
|| realCause instanceof PulsarClientException.ConnectException
|| realCause instanceof MetadataStoreException
|| realCause instanceof BKException;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
Expand Down Expand Up @@ -81,11 +83,13 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
Expand All @@ -109,16 +113,26 @@ protected void cleanup() {
super.internalCleanup();
}


@DataProvider(name = "retryableErrors")
public Object[][] retryableErrors() {
return new Object[][] {
{new ManagedLedgerException("mock retryable error")},
{new MetadataStoreException("mock retryable error")},
{new BKException(-1)},
};
}

/**
* Test consumer can be built successfully with retryable exception
* and get correct error with no-retryable exception.
* @throws Exception
*/
@Test(timeOut = 60000)
public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
@Test(timeOut = 60000, dataProvider = "retryableErrors")
public void testBuildConsumerEncounterPendingAckInitFailure(Exception retryableError) throws Exception {
// 1. Prepare and make sure the consumer can be built successfully.
String topic = NAMESPACE1 + "/testUnloadSubscriptionWhenFailedInitPendingAck";
@Cleanup
String topic = BrokerTestUtil.newUniqueName(NAMESPACE1 + "/tp");
admin.topics().createNonPartitionedTopic(topic);
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.subscriptionName("subName1")
.topic(topic)
Expand All @@ -136,11 +150,10 @@ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
// The consumer will be built successfully after one time retry.
when(mockProvider.checkInitializedBefore(any()))
// First, the method checkInitializedBefore will fail with a retryable exception.
.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail initialize")))
.thenReturn(FutureUtil.failedFuture(retryableError))
// Then, the method will be executed successfully.
.thenReturn(CompletableFuture.completedFuture(false));
transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider);
@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.subscriptionName("subName2")
.topic(topic)
Expand All @@ -157,7 +170,6 @@ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
// Then, the method will be executed successfully.
.thenCallRealMethod();
transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider);
@Cleanup
Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
.subscriptionName("subName3")
.topic(topic)
Expand All @@ -170,7 +182,7 @@ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException
.NonRecoverableLedgerException("mock fail")))
.thenReturn(CompletableFuture.completedFuture(false));
@Cleanup PulsarClient pulsarClient = PulsarClient.builder()
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
.operationTimeout(3, TimeUnit.SECONDS)
.build();
Expand All @@ -184,6 +196,13 @@ public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
} catch (Exception exception) {
assertTrue(exception.getMessage().contains("Failed to init transaction pending ack."));
}

// cleanup.
consumer1.close();
consumer2.close();
consumer3.close();
pulsarClient.close();
admin.topics().delete(topic, false);
}

@Test
Expand Down
Loading