Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NPE when ACK grouping tracker checks duplicated message id #10586

Merged

Conversation

BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented May 14, 2021

Motivation

Recently I encountered the following NPE but it's hard to reproduce.

01:40:14.630 [pulsar-client-io-54-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [10.0.0.44/10.0.0.44:6650] Got exception java.lang.NullPointerException
    at org.apache.pulsar.client.impl.MessageIdImpl.compareTo(MessageIdImpl.java:213)
    at org.apache.pulsar.client.impl.MessageIdImpl.compareTo(MessageIdImpl.java:37)
    at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.isDuplicate(PersistentAcknowledgmentsGroupingTracker.java:117)
    at org.apache.pulsar.client.impl.ConsumerImpl.messageReceived(ConsumerImpl.java:1013)

From the stack we can see NPE was thrown when an ACK grouping tracker checks duplicated message id. The tracker maintains a LastCumulativeAck field that has a MessageIdImpl type field messageId. However, messageId could be null after recycle() or just was created with a null MessageIdImpl. We should check null here. Anyway, it may be introduced from #8996

Modifications

  • Check null in PersistentAcknowledgmentsGroupingTracker#isDuplicate and returns false if it's null, then the tracker will do nothing in ConsumerImpl#messageReceived.
  • Check null in MessageIdImpl#compareTo and throw UnsupportedOperationException if the compared object is null.

Verifying this change

  • Make sure that the change passes the CI checks.

This change is a trivial rework / code cleanup without any test coverage.

@BewareMyPower
Copy link
Contributor Author

Now all tests passed, PTAL again @eolivelli

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@eolivelli eolivelli merged commit 993ea62 into apache:master May 14, 2021
@codelipenghui codelipenghui added this to the 2.8.0 milestone May 15, 2021
@codelipenghui codelipenghui added the type/bug The PR fixed a bug or issue reported a bug label May 15, 2021
@BewareMyPower BewareMyPower deleted the bewaremypower/fix-npe-msgid-compare branch May 16, 2021 16:31
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jun 15, 2022
### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, two synchronized methods
were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values.
- `moveOwnershipTo`: This method moves the ownership to another
  `LastCumulativeAck` object, which will be responsible to recycle the
  `BitSetRecyclable` field after that.

With the methods above, each time `flushAsync` is called, move the
ownership of `lastCumulativeAck` field to another thread local field to
send the ACK command and recycle the `BitSetRecyclable` field.

- `lastCumulativeAck` updates the latest message ID and bit set, the
  update operations can be performed by multiple threads and
  `lastCumulativeAck` saves the latest value.
- `threadLocalLastCumulativeAckToFlush` only acts as a temporary cache
  to the latest value in `flushAsync`.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jun 15, 2022
### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, two synchronized methods
were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values.
- `moveOwnershipTo`: This method moves the ownership to another
  `LastCumulativeAck` object, which will be responsible to recycle the
  `BitSetRecyclable` field after that.

With the methods above, each time `flushAsync` is called, move the
ownership of `lastCumulativeAck` field to another thread local field to
send the ACK command and recycle the `BitSetRecyclable` field.

- `lastCumulativeAck` updates the latest message ID and bit set, the
  update operations can be performed by multiple threads and
  `lastCumulativeAck` saves the latest value.
- `threadLocalLastCumulativeAckToFlush` only acts as a temporary cache
  to the latest value in `flushAsync`.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jun 16, 2022
### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values.
- `moveOwnershipTo`: This method moves the ownership to another
  `LastCumulativeAck` object. After that, the `update` operation on this
  object won't recycle the `BitSetRecyclable` field.
- `restoreOwnershipIfEmpty`: Restore the ownership from another
  `LastCumulativeAck` object.

With the methods above, each time `flushAsync` is called, move the
ownership of `lastCumulativeAck` field to another thread local field to
send the ACK command. After that, restore the ownership to
`lastCumulativeAck` unless it has been updated in other threads.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jun 21, 2022
### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. Then mark it as no need to flush.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jun 21, 2022
### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.

Remove unused field

Don't reset in LastCumulativeAck#flush
BewareMyPower added a commit that referenced this pull request Jun 22, 2022
…6072)

### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- #10586
- #12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
#8996 to hold two object
references, but this modification is wrong.

Before #8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after #8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.
codelipenghui pushed a commit that referenced this pull request Jun 28, 2022
…6072)

### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- #10586
- #12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
#8996 to hold two object
references, but this modification is wrong.

Before #8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after #8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.

(cherry picked from commit 936d6fd)
mattisonchao pushed a commit that referenced this pull request Jul 2, 2022
…6072)

### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- #10586
- #12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
#8996 to hold two object
references, but this modification is wrong.

Before #8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after #8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.

(cherry picked from commit 936d6fd)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jul 4, 2022
…ache#16072)

### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.

(cherry picked from commit 936d6fd)
(cherry picked from commit 5eefdf1)
BewareMyPower added a commit that referenced this pull request Jul 29, 2022
…6072)

### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- #10586
- #12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
#8996 to hold two object
references, but this modification is wrong.

Before #8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after #8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.

(cherry picked from commit 936d6fd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants