From 92b47085b2ca5e472d647dd418f7e07e09256578 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Wed, 12 Oct 2022 19:57:12 +0800 Subject: [PATCH] [fix][txn] optimize the ack/send future in TransactionImpl (#17777) ### Motivation The TransactionImpl stores a lot of future. This uses a lot of memory and can be optimized to two futures. ### Modification Optimize the future list to a single future. --- .../broker/transaction/TransactionTest.java | 82 +++++++++++++++++++ .../client/api/PulsarClientException.java | 24 +++++- .../impl/transaction/TransactionImpl.java | 81 ++++++++++++------ 3 files changed, 160 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 6a078fc19a73d..b50d4e6a00d80 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -51,8 +51,10 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -319,6 +321,86 @@ public Consumer getConsumer(String topicName, String subName) throws Pul .subscribe(); } + @Test + public void testAsyncSendOrAckForSingleFuture() throws Exception { + String topic = NAMESPACE1 + "/testSingleFuture"; + int totalMessage = 10; + int threadSize = 30; + String topicName = "subscription"; + getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false); + ExecutorService executorService = Executors.newFixedThreadPool(threadSize); + + //build producer/consumer + Producer producer = pulsarClient.newProducer() + .topic(topic) + .producerName("producer") + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName(topicName) + .subscribe(); + //store the send/ack result futures + CopyOnWriteArrayList> sendFutures = new CopyOnWriteArrayList<>(); + CopyOnWriteArrayList> ackFutures = new CopyOnWriteArrayList<>(); + + //send and ack messages with transaction + Transaction transaction1 = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS) + .build() + .get(); + + for (int i = 0; i < totalMessage * threadSize; i++) { + producer.newMessage().send(); + } + + CountDownLatch countDownLatch = new CountDownLatch(threadSize); + for (int i = 0; i < threadSize; i++) { + executorService.submit(() -> { + try { + for (int j = 0; j < totalMessage; j++) { + CompletableFuture sendFuture = producer.newMessage(transaction1).sendAsync(); + sendFutures.add(sendFuture); + Message message = consumer.receive(); + CompletableFuture ackFuture = consumer.acknowledgeAsync(message.getMessageId(), + transaction1); + ackFutures.add(ackFuture); + } + countDownLatch.countDown(); + } catch (Exception e) { + log.error("Failed to send/ack messages with transaction.", e); + countDownLatch.countDown(); + } + }); + } + //wait the all send/ack op is executed and store its futures in the arraylist. + countDownLatch.await(10, TimeUnit.SECONDS); + transaction1.commit().get(); + + //verify the final status is right. + Field ackCountField = TransactionImpl.class.getDeclaredField("opCount"); + ackCountField.setAccessible(true); + long ackCount = (long) ackCountField.get(transaction1); + Assert.assertEquals(ackCount, 0L); + + for (int i = 0; i < totalMessage * threadSize; i++) { + Assert.assertTrue(sendFutures.get(i).isDone()); + Assert.assertTrue(ackFutures.get(i).isDone()); + } + + //verify opFuture without any operation. + Transaction transaction2 = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS) + .build() + .get(); + Awaitility.await().until(() -> { + transaction2.commit().get(); + return true; + }); + } + @Test public void testGetTxnID() throws Exception { Transaction transaction = pulsarClient.newTransaction() diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index b207387f96346..c68c575ec4f3b 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -910,6 +910,23 @@ public TransactionConflictException(String msg) { } } + public static class TransactionHasOperationFailedException extends PulsarClientException { + /** + * Constructs an {@code TransactionHasOperationFailedException}. + */ + public TransactionHasOperationFailedException() { + super("Now allowed to commit the transaction due to failed operations of producing or acknowledgment"); + } + + /** + * Constructs an {@code TransactionHasOperationFailedException} with the specified detail message. + * @param msg The detail message. + */ + public TransactionHasOperationFailedException(String msg) { + super(msg); + } + } + // wrap an exception to enriching more info messages. public static Throwable wrap(Throwable t, String msg) { msg += "\n" + t.getMessage(); @@ -972,6 +989,8 @@ public static Throwable wrap(Throwable t, String msg) { return new MessageAcknowledgeException(msg); } else if (t instanceof TransactionConflictException) { return new TransactionConflictException(msg); + } else if (t instanceof TransactionHasOperationFailedException) { + return new TransactionHasOperationFailedException(msg); } else if (t instanceof PulsarClientException) { return new PulsarClientException(msg); } else if (t instanceof CompletionException) { @@ -1070,6 +1089,8 @@ public static PulsarClientException unwrap(Throwable t) { newException = new MemoryBufferIsFullError(msg); } else if (cause instanceof NotFoundException) { newException = new NotFoundException(msg); + } else if (cause instanceof TransactionHasOperationFailedException) { + newException = new TransactionHasOperationFailedException(msg); } else { newException = new PulsarClientException(t); } @@ -1133,7 +1154,8 @@ public static boolean isRetriableError(Throwable t) { || t instanceof MessageAcknowledgeException || t instanceof TransactionConflictException || t instanceof ProducerBusyException - || t instanceof ConsumerBusyException) { + || t instanceof ConsumerBusyException + || t instanceof TransactionHasOperationFailedException) { return false; } return true; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java index 833b0957d1c8a..c5d15e246870c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java @@ -21,17 +21,17 @@ import com.google.common.collect.Lists; import io.netty.util.Timeout; import io.netty.util.TimerTask; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException; @@ -61,11 +61,18 @@ public class TransactionImpl implements Transaction , TimerTask { private final Map, CompletableFuture> registerSubscriptionMap; private final TransactionCoordinatorClientImpl tcClient; - private final ArrayList> sendFutureList; - private final ArrayList> ackFutureList; + private CompletableFuture opFuture; + + private volatile long opCount = 0L; + private static final AtomicLongFieldUpdater OP_COUNT_UPDATE = + AtomicLongFieldUpdater.newUpdater(TransactionImpl.class, "opCount"); + + private volatile State state; private static final AtomicReferenceFieldUpdater STATE_UPDATE = AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, State.class, "state"); + + private volatile boolean hasOpsFailed = false; private final Timeout timeout; @Override @@ -86,9 +93,7 @@ public void run(Timeout timeout) throws Exception { this.registerPartitionMap = new ConcurrentHashMap<>(); this.registerSubscriptionMap = new ConcurrentHashMap<>(); this.tcClient = client.getTcClient(); - - this.sendFutureList = new ArrayList<>(); - this.ackFutureList = new ArrayList<>(); + this.opFuture = CompletableFuture.completedFuture(null); this.timeout = client.getTimer().newTimeout(this, transactionTimeoutMs, TimeUnit.MILLISECONDS); } @@ -113,8 +118,25 @@ public CompletableFuture registerProducedTopic(String topic) { return completableFuture; } - public synchronized void registerSendOp(CompletableFuture sendFuture) { - sendFutureList.add(sendFuture); + public void registerSendOp(CompletableFuture newSendFuture) { + if (OP_COUNT_UPDATE.getAndIncrement(this) == 0) { + opFuture = new CompletableFuture<>(); + } + // the opCount is always bigger than 0 if there is an exception, + // and then the opFuture will never be replaced. + newSendFuture.whenComplete((messageId, e) -> { + if (e != null) { + log.error("The transaction [{}:{}] get an exception when send messages.", + txnIdMostBits, txnIdLeastBits, e); + if (!hasOpsFailed) { + hasOpsFailed = true; + } + } + CompletableFuture future = opFuture; + if (OP_COUNT_UPDATE.decrementAndGet(this) == 0) { + future.complete(null); + } + }); } // register the topics that will be modified by this transaction @@ -137,8 +159,25 @@ public CompletableFuture registerAckedTopic(String topic, String subscript return completableFuture; } - public synchronized void registerAckOp(CompletableFuture ackFuture) { - ackFutureList.add(ackFuture); + public void registerAckOp(CompletableFuture newAckFuture) { + if (OP_COUNT_UPDATE.getAndIncrement(this) == 0) { + opFuture = new CompletableFuture<>(); + } + // the opCount is always bigger than 0 if there is an exception, + // and then the opFuture will never be replaced. + newAckFuture.whenComplete((ignore, e) -> { + if (e != null) { + log.error("The transaction [{}:{}] get an exception when ack messages.", + txnIdMostBits, txnIdLeastBits, e); + if (!hasOpsFailed) { + hasOpsFailed = true; + } + } + CompletableFuture future = opFuture; + if (OP_COUNT_UPDATE.decrementAndGet(this) == 0) { + future.complete(null); + } + }); } @Override @@ -147,9 +186,10 @@ public CompletableFuture commit() { return checkIfOpenOrCommitting().thenCompose((value) -> { CompletableFuture commitFuture = new CompletableFuture<>(); this.state = State.COMMITTING; - allOpComplete().whenComplete((v, e) -> { - if (e != null) { - abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e)); + opFuture.whenComplete((v, e) -> { + if (hasOpsFailed) { + abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(new PulsarClientException + .TransactionHasOperationFailedException())); } else { tcClient.commitAsync(new TxnID(txnIdMostBits, txnIdLeastBits)) .whenComplete((vx, ex) -> { @@ -176,10 +216,7 @@ public CompletableFuture abort() { return checkIfOpenOrAborting().thenCompose(value -> { CompletableFuture abortFuture = new CompletableFuture<>(); this.state = State.ABORTING; - allOpComplete().whenComplete((v, e) -> { - if (e != null) { - log.error(e.getMessage()); - } + opFuture.whenComplete((v, e) -> { tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((vx, ex) -> { if (ex != null) { @@ -242,12 +279,4 @@ private CompletableFuture invalidTxnStatusFuture() { + txnIdLeastBits + "] with unexpected state : " + state.name() + ", expect " + State.OPEN + " state!")); } - - - private CompletableFuture allOpComplete() { - List> futureList = new ArrayList<>(); - futureList.addAll(sendFutureList); - futureList.addAll(ackFutureList); - return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])); - } }