-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker][PIP-195] Make bucket merge operation asynchronous #19873
[improve][broker][PIP-195] Make bucket merge operation asynchronous #19873
Conversation
b2f9f49
to
3cc918d
Compare
3cc918d
to
f854c59
Compare
private volatile List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments; | ||
private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments; | ||
|
||
boolean merging = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the variable merging
should be modified with volatile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because all the read and write of merging
is in synchronized
block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right, the document of J2SE describes this feature like this:
Intrinsic Locks and Synchronization
Synchronization is built around an internal entity known as the intrinsic lock or monitor lock. (The API specification often refers to this entity simply as a "monitor.") Intrinsic locks play a role in both aspects of synchronization: enforcing exclusive access to an object's state and establishing happens-before relationships that are essential to visibility.
Synchronized Methods
Second, when a synchronized method exits, it automatically establishes a happens-before relationship with any subsequent invocation of a synchronized method for the same object. This guarantees that changes to the state of the object are visible to all threads.
@@ -341,18 +339,24 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver | |||
private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() { | |||
List<ImmutableBucket> values = immutableBuckets.asMapOfRanges().values().stream().toList(); | |||
long minNumberMessages = Long.MAX_VALUE; | |||
long minScheduleTimestampSum = Long.MAX_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is variable minScheduleTimestampSum
unnecessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used in line-356
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it helps to find the first segment, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it is used to find the bucket which the min scheduled time of the next segment to merge, avoid preload indexes that have not expired for a long time.
...oker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
Show resolved
Hide resolved
541ec76
to
133887f
Compare
8f5006b
to
22c6b41
Compare
22c6b41
to
e2260cc
Compare
PIP: #16763
Motivation & Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: