-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ack response implementation #8996
Ack response implementation #8996
Conversation
/pulsarbot run-failure-checks |
# Conflicts: # pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
/pulsarbot run-failure-checks |
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Show resolved
Hide resolved
pendingIndividualTransactionAcks | ||
.add(Triple.of(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits(), msgId)); | ||
consumer.onAcknowledgeCumulative(msgId, null); | ||
if (((BatchMessageIdImpl) msgId).ackCumulative()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if we don't have this class? Should we add an instanceof test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 183 is checked, I think @congbobo184 you can use batchMessageId
since you already convert to BatchMessageIdImpl
in 184
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't | ||
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change | ||
// any ack operation is allowed. | ||
this.lock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this line out of the finally block
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
@codelipenghui Can you review it? |
# Conflicts: # pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java # pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java # pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java # pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
# Conflicts: # pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to consider the compatibility that the new client connects the old version broker which does not support enable ack response. If a broker does not write the ack response to the client, the client might be stuck on the message acknowledgment?
And this should be covered by the compatibility test in the tests
modules.
/** | ||
* Consumer ack for response timeout. | ||
*/ | ||
public static class AckResponseTimeoutException extends PulsarClientException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the PulsarClientException.TimeoutException
works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it can be deleted.
* @param ackResponseEnabled {@link Boolean} is enable ack for response | ||
* @return the consumer builder instance | ||
*/ | ||
ConsumerBuilder<T> enableAckResponse(boolean ackResponseEnabled); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConsumerBuilder<T> enableAckResponse(boolean ackResponseEnabled); | |
ConsumerBuilder<T> isAckReceiptEnabled(boolean ackReceiptEnabled); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think response
here a little bit ambiguous since the client also can get the returned future without this feature.
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Show resolved
Hide resolved
pendingIndividualTransactionAcks | ||
.add(Triple.of(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits(), msgId)); | ||
consumer.onAcknowledgeCumulative(msgId, null); | ||
if (((BatchMessageIdImpl) msgId).ackCumulative()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 183 is checked, I think @congbobo184 you can use batchMessageId
since you already convert to BatchMessageIdImpl
in 184
private MessageIdImpl modifyBatchMessageIdAndStatusInConsumer(BatchMessageIdImpl batchMessageId) { | ||
MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(), | ||
batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()); | ||
consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize()); | ||
modifyMessageIdStatusInConsumerCommon(messageId); | ||
return messageId; | ||
} | ||
|
||
private void modifyMessageIdStatusInConsumer(MessageIdImpl messageId) { | ||
consumer.getStats().incrementNumAcksSent(1); | ||
modifyMessageIdStatusInConsumerCommon(messageId); | ||
} | ||
|
||
private void modifyMessageIdStatusInConsumerCommon(MessageIdImpl messageId) { | ||
consumer.getUnAckedMessageTracker().remove(messageId); | ||
if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) { | ||
consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods not clear here. Stats, not status. And should split the update stats method and cleanup consumer method stay independent, this will improve the code readability
ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.messageId.ledgerId, | ||
lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable, | ||
AckType.Cumulative, null, Collections.emptyMap(), requestId); | ||
cnx.newAckForResponseWithFuture(cmd, requestId, currentCumulativeAckFuture); | ||
this.currentCumulativeAckFuture = new TimedCompletableFuture<>(); | ||
} else { | ||
ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.messageId.ledgerId, | ||
lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable, | ||
AckType.Cumulative, null, Collections.emptyMap(), -1); | ||
cnx.ctx().write(cmd, cnx.ctx().voidPromise()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please optimize duplicate code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we optimize the code, we will judge the ackResponseEnabled twice, because get requestId then new cmd and then write. get requestId and write are different from normal ack and ack response. the middle operation newAckComand can't be optimized.
long requestId = consumer.getClient().newRequestId(); | ||
ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId); | ||
completableFuture = cnx.newAckForResponse(cmd, requestId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should write the command to the broker? And please consider reducing the duplicate code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because newImmediateAckAndFlush so we should write the command to broker immediately.
// if don't support multi message ack, it also support ack response, so we should not think about the | ||
// ack response in this logic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only support for muti message ack. Single message acknowledge also can enable ack receipt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the chunk message, so if want to use ack response, broker must support multi message ack, we don't need to support broker don't support multi message ack and then we return ack response.
if (ackResponseEnabled) { | ||
long requestId = consumer.getClient().newRequestId(); | ||
ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), bitSet, | ||
ackType, null, map, requestId); | ||
completableFuture = cnx.newAckForResponse(cmd, requestId); | ||
} else { | ||
cnx.ctx().write(cmd, cnx.ctx().voidPromise()); | ||
ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), bitSet, | ||
ackType, null, map, -1); | ||
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); | ||
completableFuture = CompletableFuture.completedFuture(null); | ||
} | ||
} | ||
return completableFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use a method getRequestId()
and a method getCompletableFuture()
. I think this will make the logic simpler
private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, | ||
RequestType requestType, boolean flush, | ||
TimedCompletableFuture<T> future) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to return the future again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can use 2 methods to handle this case
private <T> void sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId,
RequestType requestType, boolean flush,
TimedCompletableFuture<T> future) {
}
private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType, boolean flush) {
TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
sendRequestAndHandleTimeout(... future);
return future;
}
@congbobo184 @codelipenghui @Anonymitaet @merlimat Has the "ackReceipt" feature added by this PR been documented? The only documentation that I can find is in the javadoc of the isAckReceiptEnabled method "Ack will return receipt but does not mean that the message will not be resent after get receipt." . Would it make sense to document this change in the "Pulsar binary protocol specification" document, https://pulsar.apache.org/docs/en/develop-binary-protocol/ https://github.com/apache/pulsar/blob/master/site2/docs/developing-binary-protocol.md ? |
@lhotari hi, this pr only means server receive client ack request and then send response to client. |
Hi @Anonymitaet , could you help @congbobo184 in documenting this feature? Since this PR also changed the wire protocol (there were protobuf changes), I think that this requires creating a PIP document. This is a common rule in Pulsar development that there must be a PIP for wire protocol changes. @codelipenghui Could you help with a PIP document for the "ack receipt" feature? /cc @merlimat @sijie |
@lhotari The ack response proposal contained in the transaction proposal, you can find here https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.r370pcotoarj For the documentation, you can just create an issue to track it. |
I will work with @congbobo184 on the docs and track the issue here: #11272 |
@congbobo184 I recently came across #7683 . Does this PR somehow help resolve that issue? |
Doc status update: I've discussed w/ @liangyepianzhou, he will add docs and @momo-jun will help review.
|
Doc is added #15497 |
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, two synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values. - `moveOwnershipTo`: This method moves the ownership to another `LastCumulativeAck` object, which will be responsible to recycle the `BitSetRecyclable` field after that. With the methods above, each time `flushAsync` is called, move the ownership of `lastCumulativeAck` field to another thread local field to send the ACK command and recycle the `BitSetRecyclable` field. - `lastCumulativeAck` updates the latest message ID and bit set, the update operations can be performed by multiple threads and `lastCumulativeAck` saves the latest value. - `threadLocalLastCumulativeAckToFlush` only acts as a temporary cache to the latest value in `flushAsync`.
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, two synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values. - `moveOwnershipTo`: This method moves the ownership to another `LastCumulativeAck` object, which will be responsible to recycle the `BitSetRecyclable` field after that. With the methods above, each time `flushAsync` is called, move the ownership of `lastCumulativeAck` field to another thread local field to send the ACK command and recycle the `BitSetRecyclable` field. - `lastCumulativeAck` updates the latest message ID and bit set, the update operations can be performed by multiple threads and `lastCumulativeAck` saves the latest value. - `threadLocalLastCumulativeAckToFlush` only acts as a temporary cache to the latest value in `flushAsync`.
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values. - `moveOwnershipTo`: This method moves the ownership to another `LastCumulativeAck` object. After that, the `update` operation on this object won't recycle the `BitSetRecyclable` field. - `restoreOwnershipIfEmpty`: Restore the ownership from another `LastCumulativeAck` object. With the methods above, each time `flushAsync` is called, move the ownership of `lastCumulativeAck` field to another thread local field to send the ACK command. After that, restore the ownership to `lastCumulativeAck` unless it has been updated in other threads.
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. Then mark it as no need to flush. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice.
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. Remove unused field Don't reset in LastCumulativeAck#flush
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice.
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd)
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd)
…ache#16072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd) (cherry picked from commit 5eefdf1)
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd)
Motivation
in order to handle ack response implementation. When this PR commit, I will handle #8997.
implement
Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)