-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3 #17677
Conversation
37de8d6
to
5c61216
Compare
The pr had no activity for 30 days, mark with Stale label. |
9a23f4f
to
4330cae
Compare
4330cae
to
1c5e771
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't finished the review yet, left some comments
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Show resolved
Hide resolved
...ar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotException.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, I noticed that the method below is only used by BucketDelayedDeliveryTracker
. Should it be deleted from the interface DelayedDeliveryTracker
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
Line 73 in 8246e3b
boolean containsMessage(long ledgerId, long entryId); |
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Outdated
Show resolved
Hide resolved
@Override | ||
public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata, | ||
List<SnapshotSegment> bucketSnapshotSegments) { | ||
return createLedger() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if it would cause some problems when we created a lot of ledgers, but I think it would be OK for this 0-1 implementation.
@codelipenghui Could you please help confirm it?
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Outdated
Show resolved
Hide resolved
} else { | ||
future.complete(entries); | ||
} | ||
closeLedger(handle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We ignored the result or exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a resource release, I think it can not block the process
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still have concerns here, we call this method getLedgerEntryThenCloseLedger
, but we don't ensure the ledger is closed after this method is returned. (because of the close behaviour running in the async background )
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Show resolved
Hide resolved
} else { | ||
future.complete(handle); | ||
} | ||
}, null, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add ledger metadata so that we can know which component is using this ledger through bookkeeper shell tools.
We have different components using ledgers, we need to add metadata on the ledger to know who is using it. You can check LedgerMetadataUtils
to learn about how to add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add metadata using bucketKey
1f4965b
to
068b37e
Compare
@poorbarcode @zymap @mattisonchao Please look at it again. |
068b37e
to
3c17124
Compare
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Outdated
Show resolved
Hide resolved
…ucket/BookkeeperBucketSnapshotStorage.java
…ucket/BookkeeperBucketSnapshotStorage.java
Codecov Report
@@ Coverage Diff @@
## master #17677 +/- ##
============================================
+ Coverage 47.23% 47.34% +0.10%
+ Complexity 10430 9417 -1013
============================================
Files 692 626 -66
Lines 67766 59218 -8548
Branches 7260 6152 -1108
============================================
- Hits 32009 28035 -3974
+ Misses 32192 28140 -4052
+ Partials 3565 3043 -522
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Just left a minor comment.
List<SnapshotSegment> bucketSnapshotSegments, | ||
String bucketKey) { | ||
return createLedger(bucketKey) | ||
.thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the entry add failed, do we need to clean up the created Ledger?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add clean up logical
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
Outdated
Show resolved
Hide resolved
List<SnapshotSegment> bucketSnapshotSegments) { | ||
List<CompletableFuture<Void>> addFutures = new ArrayList<>(); | ||
for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) { | ||
addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it matter if we add 1,2 successful but 3,4 failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FutureUtil.waitForAll
will make sure they will not be partially successful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but some entries have already been added to this ledger. So we will throw this book away, right?
@@ -61,7 +61,11 @@ public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMet | |||
.thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray()) | |||
.thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)) | |||
.thenCompose(__ -> closeLedger(ledgerHandle)) | |||
.thenApply(__ -> ledgerHandle.getId())); | |||
.thenApply(__ -> ledgerHandle.getId()).whenComplete((ledgerId, ex) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we add the exception handing to the end of the chain. Do we need to check the exception types? If it just failed to close the Ledger, we don't need to remove the ledger or just the Ledger creation failed.
Or we can just add exception handling for addEntry()
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
8a5c875
to
64b7310
Compare
64b7310
to
2e6a8de
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM.
Master Issue: #16763
Motivation
#16763
Modifications
Implement BookkeeperBucketSnapshotStorage and add unit test.
Verifying this change
Documentation
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)
Matching PR in forked repository
PR in forked repository: coderzc#5