diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index f0f0cfd754852..9829babece74b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -21,7 +21,7 @@ import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; -import io.netty.util.Recycler; +import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -34,9 +34,8 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; -import lombok.NonNull; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.client.api.MessageId; @@ -68,18 +67,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments private volatile TimedCompletableFuture currentIndividualAckFuture; private volatile TimedCompletableFuture currentCumulativeAckFuture; - private volatile LastCumulativeAck lastCumulativeAck = - LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null); - - private volatile boolean cumulativeAckFlushRequired = false; + private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck(); // When we flush the command, we should ensure current ack request will send correct private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private static final AtomicReferenceFieldUpdater - LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater( - PersistentAcknowledgmentsGroupingTracker.class, LastCumulativeAck.class, "lastCumulativeAck"); - /** * This is a set of all the individual acks that the application has issued and that were not already sent to * broker. @@ -116,13 +108,13 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, Consum * resent after a disconnection and for which the user has already sent an acknowledgement. */ @Override - public boolean isDuplicate(@NonNull MessageId messageId) { - final MessageId messageIdOfLastAck = lastCumulativeAck.messageId; + public boolean isDuplicate(MessageId messageId) { + final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId(); if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) { // Already included in a cumulative ack return true; } else { - return pendingIndividualAcks.contains(messageId); + return pendingIndividualAcks.contains((MessageIdImpl) messageId); } } @@ -370,30 +362,7 @@ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) { private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) { // Handle concurrent updates from different threads - LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet); - while (true) { - LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck; - if (msgId.compareTo(lastCumulativeAck.messageId) > 0) { - if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) { - if (lastCumulativeAck.bitSetRecyclable != null) { - try { - lastCumulativeAck.bitSetRecyclable.recycle(); - } catch (Exception ignore) { - // no-op - } - lastCumulativeAck.bitSetRecyclable = null; - } - lastCumulativeAck.recycle(); - // Successfully updated the last cumulative ack. Next flush iteration will send this to broker. - cumulativeAckFlushRequired = true; - return; - } - } else { - currentCumulativeAck.recycle(); - // message id acknowledging an before the current last cumulative ack - return; - } - } + lastCumulativeAck.update(msgId, bitSet); } private CompletableFuture doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId, @@ -474,15 +443,15 @@ public void flush() { } private void flushAsync(ClientCnx cnx) { + final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush(); boolean shouldFlush = false; - if (cumulativeAckFlushRequired) { - newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId, - lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable, - AckType.Cumulative, null, Collections.emptyMap(), false, - this.currentCumulativeAckFuture, null); - this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId); + if (lastCumulativeAckToFlush != null) { shouldFlush = true; - cumulativeAckFlushRequired = false; + final MessageIdImpl messageId = lastCumulativeAckToFlush.getMessageId(); + newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(), + lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, null, + Collections.emptyMap(), false, this.currentCumulativeAckFuture, null); + this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId); } // Flush all individual acks @@ -560,7 +529,7 @@ private void flushAsync(ClientCnx cnx) { @Override public void flushAndClean() { flush(); - lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null); + lastCumulativeAck.reset(); pendingIndividualAcks.clear(); } @@ -664,36 +633,72 @@ private boolean isAckReceiptEnabled(ClientCnx cnx) { return ackReceiptEnabled && cnx != null && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion()); } +} - private static class LastCumulativeAck { - private MessageIdImpl messageId; - private BitSetRecyclable bitSetRecyclable; +@Getter +class LastCumulativeAck { - static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) { - LastCumulativeAck op = RECYCLER.get(); - op.messageId = messageId; - op.bitSetRecyclable = bitSetRecyclable; - return op; - } + // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called + public static final FastThreadLocal LOCAL_LAST_CUMULATIVE_ACK = + new FastThreadLocal() { - private LastCumulativeAck(Recycler.Handle recyclerHandle) { - this.recyclerHandle = recyclerHandle; - } + @Override + protected LastCumulativeAck initialValue() { + return new LastCumulativeAck(); + } + }; + public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest; - void recycle() { - if (bitSetRecyclable != null) { + private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID; + private BitSetRecyclable bitSetRecyclable = null; + private boolean flushRequired = false; + + public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) { + if (messageId.compareTo(this.messageId) > 0) { + if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) { this.bitSetRecyclable.recycle(); } - this.messageId = null; - recyclerHandle.recycle(this); + set(messageId, bitSetRecyclable); + flushRequired = true; } + } - private final Recycler.Handle recyclerHandle; - private static final Recycler RECYCLER = new Recycler() { - @Override - protected LastCumulativeAck newObject(Handle handle) { - return new LastCumulativeAck(handle); + public synchronized LastCumulativeAck flush() { + if (flushRequired) { + final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get(); + if (bitSetRecyclable != null) { + localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray())); + } else { + localLastCumulativeAck.set(this.messageId, null); } - }; + flushRequired = false; + return localLastCumulativeAck; + } else { + // Return null to indicate nothing to be flushed + return null; + } + } + + public synchronized void reset() { + if (bitSetRecyclable != null) { + bitSetRecyclable.recycle(); + } + messageId = DEFAULT_MESSAGE_ID; + bitSetRecyclable = null; + flushRequired = false; + } + + private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) { + this.messageId = messageId; + this.bitSetRecyclable = bitSetRecyclable; + } + + @Override + public String toString() { + String s = messageId.toString(); + if (bitSetRecyclable != null) { + s += " (bit set: " + bitSetRecyclable + ")"; + } + return s; } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java new file mode 100644 index 0000000000000..102ccfc0e07a5 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotSame; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertTrue; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; +import org.testng.annotations.Test; + +public class LastCumulativeAckTest { + + @Test + public void testUpdate() { + final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck(); + assertFalse(lastCumulativeAck.isFlushRequired()); + assertEquals(lastCumulativeAck.getMessageId(), LastCumulativeAck.DEFAULT_MESSAGE_ID); + assertNull(lastCumulativeAck.getBitSetRecyclable()); + + final MessageIdImpl messageId1 = new MessageIdImpl(0L, 1L, 10); + final BitSetRecyclable bitSetRecyclable1 = BitSetRecyclable.create(); + bitSetRecyclable1.set(0, 3); + lastCumulativeAck.update(messageId1, bitSetRecyclable1); + assertTrue(lastCumulativeAck.isFlushRequired()); + assertSame(lastCumulativeAck.getMessageId(), messageId1); + assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1); + + final MessageIdImpl messageId2 = new MessageIdImpl(0L, 2L, 8); + lastCumulativeAck.update(messageId2, bitSetRecyclable1); + // bitSetRecyclable1 is not recycled + assertEquals(bitSetRecyclable1.toString(), "{0, 1, 2}"); + + final BitSetRecyclable bitSetRecyclable2 = BitSetRecyclable.create(); + bitSetRecyclable2.set(0, 2); + + // `update()` only accepts a newer message ID, so this call here has no side effect + lastCumulativeAck.update(messageId2, bitSetRecyclable2); + assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1); + + final MessageIdImpl messageId3 = new MessageIdImpl(0L, 3L, 9); + lastCumulativeAck.update(messageId3, bitSetRecyclable2); + // bitSetRecyclable1 is recycled because it's replaced in `update` + assertEquals(bitSetRecyclable1.toString(), "{}"); + assertSame(lastCumulativeAck.getMessageId(), messageId3); + assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable2); + bitSetRecyclable2.recycle(); + } + + @Test + public void testFlush() { + final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck(); + assertNull(lastCumulativeAck.flush()); + + final MessageIdImpl messageId = new MessageIdImpl(0L, 1L, 3); + final BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create(); + bitSetRecyclable.set(0, 3); + lastCumulativeAck.update(messageId, bitSetRecyclable); + assertTrue(lastCumulativeAck.isFlushRequired()); + + final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush(); + assertFalse(lastCumulativeAck.isFlushRequired()); + assertSame(lastCumulativeAckToFlush.getMessageId(), messageId); + assertNotSame(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable); + assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable); + } + +}