Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -3187,7 +3187,7 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
} else {
if (Commands.peerSupportsMultiMessageAcknowledgment(
getClientCnx().getRemoteEndpointProtocolVersion())) {
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck =
new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
Expand Down Expand Up @@ -3225,7 +3225,7 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
}

private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries,
List<Triple<Long, Long, ConcurrentBitSet>> entries,
long requestID) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
Expand All @@ -3244,7 +3244,7 @@ protected BaseCommand initialValue() throws Exception {
}
};

private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSet>> entries) {
BaseCommand cmd = LOCAL_BASE_COMMAND.get()
.clear()
.setType(BaseCommand.Type.ACK);
Expand All @@ -3253,7 +3253,7 @@ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, Conc
for (int i = 0; i < entriesCount; i++) {
long ledgerId = entries.get(i).getLeft();
long entryId = entries.get(i).getMiddle();
ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
ConcurrentBitSet bitSet = entries.get(i).getRight();
MessageIdData msgId = ack.addMessageId()
.setLedgerId(ledgerId)
.setEntryId(entryId);
Expand All @@ -3262,7 +3262,6 @@ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, Conc
for (int j = 0; j < ackSet.length; j++) {
msgId.addAckSet(ackSet[j]);
}
bitSet.recycle();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
import org.jspecify.annotations.Nullable;

/**
Expand Down Expand Up @@ -83,7 +83,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
*/
private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks;
@VisibleForTesting
final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks;
final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSet> pendingIndividualBatchIndexAcks;

private final ScheduledFuture<?> scheduledTask;
private final boolean batchIndexAckEnabled;
Expand Down Expand Up @@ -133,7 +133,7 @@ public boolean isDuplicate(MessageId messageId) {
return true;
}
if (messageIdAdv.getBatchIndex() >= 0) {
ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.get(key);
ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.get(key);
return bitSet != null && !bitSet.get(messageIdAdv.getBatchIndex());
}
return false;
Expand Down Expand Up @@ -327,21 +327,22 @@ private CompletableFuture<Void> doCumulativeAck(MessageIdAdv messageId, Map<Stri

@VisibleForTesting
CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) {
ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
MessageIdAdvUtils.discardBatch(msgId), __ -> {
final BitSet ackSet = msgId.getAckSet();
final ConcurrentBitSetRecyclable value;
final ConcurrentBitSet value;
if (ackSet != null) {
synchronized (ackSet) {
if (!ackSet.isEmpty()) {
value = ConcurrentBitSetRecyclable.create(ackSet);
value = new ConcurrentBitSet();
value.or(ackSet);
} else {
value = ConcurrentBitSetRecyclable.create();
value = new ConcurrentBitSet();
value.set(0, msgId.getBatchSize());
}
}
} else {
value = ConcurrentBitSetRecyclable.create();
value = new ConcurrentBitSet();
value.set(0, msgId.getBatchSize());
}
return value;
Expand Down Expand Up @@ -445,7 +446,7 @@ private void flushAsync(ClientCnx cnx) {
}

// Flush all individual acks
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck =
new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size());
if (!pendingIndividualAcks.isEmpty()) {
if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
Expand Down Expand Up @@ -487,7 +488,7 @@ private void flushAsync(ClientCnx cnx) {
}

while (true) {
Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry =
Map.Entry<MessageIdAdv, ConcurrentBitSet> entry =
pendingIndividualBatchIndexAcks.pollFirstEntry();
if (entry == null) {
// The entry has been removed in a different thread
Expand Down Expand Up @@ -539,7 +540,7 @@ private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, Message
// cumulative ack chunk by the last messageId
if (chunkMsgIds != null && ackType != AckType.Cumulative) {
if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
Expand Down Expand Up @@ -568,7 +569,7 @@ private CompletableFuture<Void> newMessageAckCommandAndWrite(
long entryId, BitSetRecyclable ackSet, AckType ackType,
Map<String, Long> properties, boolean flush,
TimedCompletableFuture<Void> timedCompletableFuture,
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) {
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck) {
if (consumer.isAckReceiptEnabled()) {
final long requestId = consumer.getClient().newRequestId();
final ByteBuf cmd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -405,6 +407,54 @@ public void testDoIndividualBatchAckAsync() {
tracker.close();
}

@Test
public void testDoIndividualBatchAckNeverAffectIsDuplicate() throws Exception {
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
conf.setMaxAcknowledgmentGroupSize(1);
PersistentAcknowledgmentsGroupingTracker tracker =
new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);

BatchMessageIdImpl batchMessageId0 = new BatchMessageIdImpl(5, 1, 0, 0, 10, null);
BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(5, 1, 0, 1, 10, null);

int loops = 10000;
int addAcknowledgmentThreadCount = 10;
List<Thread> addAcknowledgmentThreads = new ArrayList<>(addAcknowledgmentThreadCount);
for (int i = 0; i < addAcknowledgmentThreadCount; i++) {
Thread addAcknowledgmentThread = new Thread(() -> {
for (int j = 0; j < loops; j++) {
tracker.addAcknowledgment(batchMessageId0, AckType.Individual, Collections.emptyMap());
}
}, "doIndividualBatchAck-thread-" + i);
addAcknowledgmentThread.start();
addAcknowledgmentThreads.add(addAcknowledgmentThread);
}

int isDuplicateThreadCount = 10;
AtomicBoolean assertResult = new AtomicBoolean();
List<Thread> isDuplicateThreads = new ArrayList<>(isDuplicateThreadCount);
for (int i = 0; i < isDuplicateThreadCount; i++) {
Thread isDuplicateThread = new Thread(() -> {
for (int j = 0; j < loops; j++) {
boolean duplicate = tracker.isDuplicate(batchMessageId1);
assertResult.set(assertResult.get() || duplicate);
}
}, "isDuplicate-thread-" + i);
isDuplicateThread.start();
isDuplicateThreads.add(isDuplicateThread);
}

for (Thread addAcknowledgmentThread : addAcknowledgmentThreads) {
addAcknowledgmentThread.join();
}

for (Thread isDuplicateThread : isDuplicateThreads) {
isDuplicateThread.join();
}

assertFalse(assertResult.get());
}

public class ClientCnxTest extends ClientCnx {

public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSet;

@UtilityClass
@Slf4j
Expand Down Expand Up @@ -1035,7 +1035,7 @@ public static ByteBuf newLookupErrorResponse(ServerError error, String errorMsg,
}

public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
List<Triple<Long, Long, ConcurrentBitSet>> entries) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
.setConsumerId(consumerId)
Expand All @@ -1045,14 +1045,14 @@ public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID
return serializeWithSize(cmd);
}

private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSet>> entries) {
BaseCommand cmd = localCmd(Type.ACK);
CommandAck ack = cmd.setAck();
int entriesCount = entries.size();
for (int i = 0; i < entriesCount; i++) {
long ledgerId = entries.get(i).getLeft();
long entryId = entries.get(i).getMiddle();
ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
ConcurrentBitSet bitSet = entries.get(i).getRight();
MessageIdData msgId = ack.addMessageId()
.setLedgerId(ledgerId)
.setEntryId(entryId);
Expand All @@ -1061,15 +1061,14 @@ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, Conc
for (int j = 0; j < ackSet.length; j++) {
msgId.addAckSet(ackSet[j]);
}
bitSet.recycle();
}
}

return cmd;
}

public static ByteBuf newMultiMessageAck(long consumerId,
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries,
List<Triple<Long, Long, ConcurrentBitSet>> entries,
long requestId) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
/**
* Safe multithreaded version of {@code BitSet} and leverage netty recycler.
*/
@Deprecated
@EqualsAndHashCode(callSuper = true)
public class ConcurrentBitSetRecyclable extends ConcurrentBitSet {

Expand Down
Loading