From 99cab24b3631437a7d143b3624959505f1e19929 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Wed, 13 Dec 2023 10:21:11 +0800 Subject: [PATCH] [fix][txn] Fix getting last message ID when there are ongoing transactions (#21466) --- .../pulsar/broker/service/ServerCnx.java | 37 ++--- .../service/persistent/PersistentTopic.java | 4 +- .../buffer/impl/InMemTransactionBuffer.java | 5 +- .../buffer/impl/TopicTransactionBuffer.java | 4 +- .../buffer/impl/TransactionBufferDisable.java | 8 +- .../buffer/TopicTransactionBufferTest.java | 131 ++++++++++++++++++ 6 files changed, 167 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4e61a3228fe59..cca53bf9d6aa3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2063,23 +2063,28 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) long requestId = getLastMessageId.getRequestId(); Topic topic = consumer.getSubscription().getTopic(); - Position lastPosition = topic.getLastPosition(); - int partitionIndex = TopicName.getPartitionIndex(topic.getName()); - - Position markDeletePosition = null; - if (consumer.getSubscription() instanceof PersistentSubscription) { - markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() - .getMarkDeletedPosition(); - } - - getLargestBatchIndexWhenPossible( - topic, - (PositionImpl) lastPosition, - (PositionImpl) markDeletePosition, - partitionIndex, - requestId, - consumer.getSubscription().getName()); + topic.checkIfTransactionBufferRecoverCompletely(true).thenRun(() -> { + Position lastPosition = ((PersistentTopic) topic).getMaxReadPosition(); + int partitionIndex = TopicName.getPartitionIndex(topic.getName()); + + Position markDeletePosition = null; + if (consumer.getSubscription() instanceof PersistentSubscription) { + markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() + .getMarkDeletedPosition(); + } + getLargestBatchIndexWhenPossible( + topic, + (PositionImpl) lastPosition, + (PositionImpl) markDeletePosition, + partitionIndex, + requestId, + consumer.getSubscription().getName()); + }).exceptionally(e -> { + writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), + ServerError.UnknownError, "Failed to recover Transaction Buffer.")); + return null; + }); } else { writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1619c977d1f44..bbd4c31c25820 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -309,7 +309,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { - this.transactionBuffer = new TransactionBufferDisable(); + this.transactionBuffer = new TransactionBufferDisable(this); } transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); if (ledger instanceof ShadowManagedLedgerImpl) { @@ -402,7 +402,7 @@ public CompletableFuture initialize() { this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { - this.transactionBuffer = new TransactionBufferDisable(); + this.transactionBuffer = new TransactionBufferDisable(this); } shadowSourceTopic = null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index bc2dd58a5812e..978536c5f4e36 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -212,9 +212,12 @@ public TransactionBufferReader newReader(long sequenceId) throws final ConcurrentMap buffers; final Map> txnIndex; + private final Topic topic; + public InMemTransactionBuffer(Topic topic) { this.buffers = new ConcurrentHashMap<>(); this.txnIndex = new HashMap<>(); + this.topic = topic; } @Override @@ -372,7 +375,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) { @Override public PositionImpl getMaxReadPosition() { - return PositionImpl.LATEST; + return (PositionImpl) topic.getLastPosition(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 3c13be220869f..f356921d6988e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -37,6 +37,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.map.LinkedMap; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -446,8 +447,7 @@ void updateMaxReadPosition(TxnID txnID) { ongoingTxns.remove(txnID); if (!ongoingTxns.isEmpty()) { PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey()); - //max read position is less than first ongoing transaction message position, so entryId -1 - maxReadPosition = PositionImpl.get(position.getLedgerId(), position.getEntryId() - 1); + maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position); } else { maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index 7c74b52951e28..9de0888ae5b0b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -25,6 +25,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader; @@ -40,6 +41,11 @@ @Slf4j public class TransactionBufferDisable implements TransactionBuffer { + private final Topic topic; + public TransactionBufferDisable(Topic topic) { + this.topic = topic; + } + @Override public CompletableFuture getTransactionMeta(TxnID txnID) { return CompletableFuture.completedFuture(null); @@ -91,7 +97,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) { @Override public PositionImpl getMaxReadPosition() { - return PositionImpl.LATEST; + return (PositionImpl) topic.getLastPosition(); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index aa98fc7d70106..6ab56a613c500 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -18,9 +18,18 @@ */ package org.apache.pulsar.broker.transaction.buffer; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.when; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; +import java.util.List; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; @@ -30,8 +39,13 @@ import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; @@ -179,4 +193,121 @@ public void testCloseTransactionBufferWhenTimeout() throws Exception { Assert.assertTrue(f.isCompletedExceptionally()); } + /** + * This test mainly test the following two point: + * 1. `getLastMessageIds` will get max read position. + * Send two message |1:0|1:1|; mock max read position as |1:0|; `getLastMessageIds` will get |1:0|. + * 2. `getLastMessageIds` will wait Transaction buffer recover completely. + * Mock `checkIfTBRecoverCompletely` return an exception, `getLastMessageIds` will fail too. + * Mock `checkIfTBRecoverCompletely` return null, `getLastMessageIds` will get correct result. + */ + @Test + public void testGetMaxPositionAfterTBReady() throws Exception { + // 1. Prepare test environment. + String topic = "persistent://" + NAMESPACE1 + "/testGetMaxReadyPositionAfterTBReady"; + // 1.1 Mock component. + TransactionBuffer transactionBuffer = Mockito.spy(TransactionBuffer.class); + when(transactionBuffer.checkIfTBRecoverCompletely(anyBoolean())) + // Handle producer will check transaction buffer recover completely. + .thenReturn(CompletableFuture.completedFuture(null)) + // If the Transaction buffer failed to recover, we can not get the correct last max read id. + .thenReturn(CompletableFuture.failedFuture(new Throwable("Mock fail"))) + // If the transaction buffer recover successfully, the max read position can be acquired successfully. + .thenReturn(CompletableFuture.completedFuture(null)); + TransactionBufferProvider transactionBufferProvider = Mockito.spy(TransactionBufferProvider.class); + Mockito.doReturn(transactionBuffer).when(transactionBufferProvider).newTransactionBuffer(any()); + TransactionBufferProvider originalTBProvider = getPulsarServiceList().get(0).getTransactionBufferProvider(); + Mockito.doReturn(transactionBufferProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider(); + // 2. Building producer and consumer. + admin.topics().createNonPartitionedTopic(topic); + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribe(); + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + // 3. Send message and test the exception can be handled as expected. + MessageIdImpl messageId = (MessageIdImpl) producer.newMessage().send(); + producer.newMessage().send(); + Mockito.doReturn(new PositionImpl(messageId.getLedgerId(), messageId.getEntryId())) + .when(transactionBuffer).getMaxReadPosition(); + try { + consumer.getLastMessageIds(); + fail(); + } catch (PulsarClientException exception) { + assertTrue(exception.getMessage().contains("Failed to recover Transaction Buffer.")); + } + List messageIdList = consumer.getLastMessageIds(); + assertEquals(messageIdList.size(), 1); + TopicMessageIdImpl actualMessageID = (TopicMessageIdImpl) messageIdList.get(0); + assertEquals(messageId.getLedgerId(), actualMessageID.getLedgerId()); + assertEquals(messageId.getEntryId(), actualMessageID.getEntryId()); + // 4. Clean resource + Mockito.doReturn(originalTBProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider(); + } + + /** + * Add a E2E test for the get last message ID. It tests 4 cases. + *

+ * 1. Only normal messages in the topic. + * 2. There are ongoing transactions, last message ID will not be updated until transaction end. + * 3. Aborted transaction will make the last message ID be updated as expected. + * 4. Committed transaction will make the last message ID be updated as expected. + *

+ */ + @Test + public void testGetLastMessageIdsWithOngoingTransactions() throws Exception { + // 1. Prepare environment + String topic = "persistent://" + NAMESPACE1 + "/testGetLastMessageIdsWithOngoingTransactions"; + String subName = "my-subscription"; + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subName) + .subscribe(); + + // 2. Test last max read position can be required correctly. + // 2.1 Case1: send 3 original messages. |1:0|1:1|1:2| + MessageIdImpl expectedLastMessageID = null; + for (int i = 0; i < 3; i++) { + expectedLastMessageID = (MessageIdImpl) producer.newMessage().send(); + } + assertMessageId(consumer, expectedLastMessageID, 0); + // 2.2 Case2: send 2 ongoing transactional messages and 2 original messages. + // |1:0|1:1|1:2|txn1->1:3|1:4|txn2->1:5|1:6|. + Transaction txn1 = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.HOURS) + .build() + .get(); + Transaction txn2 = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.HOURS) + .build() + .get(); + producer.newMessage(txn1).send(); + MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send(); + producer.newMessage(txn2).send(); + MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) producer.newMessage().send(); + // 2.2.1 Last message ID will not change when txn1 and txn2 do not end. + assertMessageId(consumer, expectedLastMessageID, 0); + // 2.2.2 Last message ID will update to 1:4 when txn1 committed. + txn1.commit().get(5, TimeUnit.SECONDS); + assertMessageId(consumer, expectedLastMessageID1, 0); + // 2.2.3 Last message ID will update to 1:6 when txn2 aborted. + txn2.abort().get(5, TimeUnit.SECONDS); + // Todo: We can not ignore the marker's position in this fix. + assertMessageId(consumer, expectedLastMessageID2, 2); + } + + private void assertMessageId(Consumer consumer, MessageIdImpl expected, int entryOffset) throws Exception { + TopicMessageIdImpl actual = (TopicMessageIdImpl) consumer.getLastMessageIds().get(0); + assertEquals(expected.getEntryId(), actual.getEntryId() - entryOffset); + assertEquals(expected.getLedgerId(), actual.getLedgerId()); + } + }