-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix NPE when ACK grouping tracker checks duplicated message id #10586
Merged
eolivelli
merged 2 commits into
apache:master
from
BewareMyPower:bewaremypower/fix-npe-msgid-compare
May 14, 2021
Merged
Fix NPE when ACK grouping tracker checks duplicated message id #10586
eolivelli
merged 2 commits into
apache:master
from
BewareMyPower:bewaremypower/fix-npe-msgid-compare
May 14, 2021
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dockerzhang
approved these changes
May 14, 2021
eolivelli
requested changes
May 14, 2021
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
Outdated
Show resolved
Hide resolved
Now all tests passed, PTAL again @eolivelli |
eolivelli
approved these changes
May 14, 2021
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
1 task
Merged
2 tasks
BewareMyPower
added a commit
to BewareMyPower/pulsar
that referenced
this pull request
Jun 15, 2022
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, two synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values. - `moveOwnershipTo`: This method moves the ownership to another `LastCumulativeAck` object, which will be responsible to recycle the `BitSetRecyclable` field after that. With the methods above, each time `flushAsync` is called, move the ownership of `lastCumulativeAck` field to another thread local field to send the ACK command and recycle the `BitSetRecyclable` field. - `lastCumulativeAck` updates the latest message ID and bit set, the update operations can be performed by multiple threads and `lastCumulativeAck` saves the latest value. - `threadLocalLastCumulativeAckToFlush` only acts as a temporary cache to the latest value in `flushAsync`.
BewareMyPower
added a commit
to BewareMyPower/pulsar
that referenced
this pull request
Jun 15, 2022
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, two synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values. - `moveOwnershipTo`: This method moves the ownership to another `LastCumulativeAck` object, which will be responsible to recycle the `BitSetRecyclable` field after that. With the methods above, each time `flushAsync` is called, move the ownership of `lastCumulativeAck` field to another thread local field to send the ACK command and recycle the `BitSetRecyclable` field. - `lastCumulativeAck` updates the latest message ID and bit set, the update operations can be performed by multiple threads and `lastCumulativeAck` saves the latest value. - `threadLocalLastCumulativeAckToFlush` only acts as a temporary cache to the latest value in `flushAsync`.
4 tasks
BewareMyPower
added a commit
to BewareMyPower/pulsar
that referenced
this pull request
Jun 16, 2022
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values. - `moveOwnershipTo`: This method moves the ownership to another `LastCumulativeAck` object. After that, the `update` operation on this object won't recycle the `BitSetRecyclable` field. - `restoreOwnershipIfEmpty`: Restore the ownership from another `LastCumulativeAck` object. With the methods above, each time `flushAsync` is called, move the ownership of `lastCumulativeAck` field to another thread local field to send the ACK command. After that, restore the ownership to `lastCumulativeAck` unless it has been updated in other threads.
BewareMyPower
added a commit
to BewareMyPower/pulsar
that referenced
this pull request
Jun 21, 2022
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. Then mark it as no need to flush. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice.
BewareMyPower
added a commit
to BewareMyPower/pulsar
that referenced
this pull request
Jun 21, 2022
### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. Remove unused field Don't reset in LastCumulativeAck#flush
BewareMyPower
added a commit
that referenced
this pull request
Jun 22, 2022
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice.
codelipenghui
pushed a commit
that referenced
this pull request
Jun 28, 2022
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd)
mattisonchao
pushed a commit
that referenced
this pull request
Jul 2, 2022
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd)
nicoloboschi
pushed a commit
to datastax/pulsar
that referenced
this pull request
Jul 4, 2022
…ache#16072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - apache#10586 - apache#12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in apache#8996 to hold two object references, but this modification is wrong. Before apache#8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after apache#8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd) (cherry picked from commit 5eefdf1)
BewareMyPower
added a commit
that referenced
this pull request
Jul 29, 2022
…6072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - #10586 - #12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in #8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fd)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Motivation
Recently I encountered the following NPE but it's hard to reproduce.
From the stack we can see NPE was thrown when an ACK grouping tracker checks duplicated message id. The tracker maintains a
LastCumulativeAck
field that has aMessageIdImpl
type fieldmessageId
. However,messageId
could be null afterrecycle()
or just was created with a nullMessageIdImpl
. We should check null here. Anyway, it may be introduced from #8996Modifications
PersistentAcknowledgmentsGroupingTracker#isDuplicate
and returns false if it's null, then the tracker will do nothing inConsumerImpl#messageReceived
.MessageIdImpl#compareTo
and throwUnsupportedOperationException
if the compared object is null.Verifying this change
This change is a trivial rework / code cleanup without any test coverage.