Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3 #17677

Merged
merged 9 commits into from
Dec 20, 2022

Conversation

coderzc
Copy link
Member

@coderzc coderzc commented Sep 15, 2022

Master Issue: #16763

Motivation

#16763

Modifications

Implement BookkeeperBucketSnapshotStorage and add unit test.

Verifying this change

  • Make sure that the change passes the CI checks.

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

Matching PR in forked repository
PR in forked repository: coderzc#5

@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Oct 24, 2022
@coderzc coderzc force-pushed the bucket_delayed_store branch from 9a23f4f to 4330cae Compare November 7, 2022 03:53
@coderzc coderzc force-pushed the bucket_delayed_store branch from 4330cae to 1c5e771 Compare November 7, 2022 03:56
Copy link
Contributor

@poorbarcode poorbarcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't finished the review yet, left some comments

Copy link
Contributor

@poorbarcode poorbarcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I noticed that the method below is only used by BucketDelayedDeliveryTracker. Should it be deleted from the interface DelayedDeliveryTracker

boolean containsMessage(long ledgerId, long entryId);

@Override
public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
List<SnapshotSegment> bucketSnapshotSegments) {
return createLedger()
Copy link
Member

@mattisonchao mattisonchao Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it would cause some problems when we created a lot of ledgers, but I think it would be OK for this 0-1 implementation.

@codelipenghui Could you please help confirm it?

} else {
future.complete(entries);
}
closeLedger(handle);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We ignored the result or exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a resource release, I think it can not block the process

Copy link
Member

@mattisonchao mattisonchao Dec 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have concerns here, we call this method getLedgerEntryThenCloseLedger, but we don't ensure the ledger is closed after this method is returned. (because of the close behaviour running in the async background )

} else {
future.complete(handle);
}
}, null, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add ledger metadata so that we can know which component is using this ledger through bookkeeper shell tools.
We have different components using ledgers, we need to add metadata on the ledger to know who is using it. You can check LedgerMetadataUtils to learn about how to add it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add metadata using bucketKey

@coderzc coderzc force-pushed the bucket_delayed_store branch from 1f4965b to 068b37e Compare November 18, 2022 10:42
@coderzc
Copy link
Member Author

coderzc commented Nov 18, 2022

@poorbarcode @zymap @mattisonchao Please look at it again.

@coderzc coderzc force-pushed the bucket_delayed_store branch from 068b37e to 3c17124 Compare November 18, 2022 10:50
@github-actions github-actions bot removed the Stale label Nov 19, 2022
@coderzc coderzc self-assigned this Nov 20, 2022
@coderzc coderzc added this to the 2.12.0 milestone Nov 20, 2022
@coderzc coderzc added type/feature The PR added a new feature or issue requested a new feature area/broker ready-to-test labels Nov 21, 2022
@coderzc coderzc requested a review from poorbarcode November 21, 2022 09:50
@codecov-commenter
Copy link

codecov-commenter commented Nov 29, 2022

Codecov Report

Merging #17677 (8a5c875) into master (a2c1534) will increase coverage by 0.10%.
The diff coverage is 6.40%.

❗ Current head 8a5c875 differs from pull request most recent head 64b7310. Consider uploading reports for the commit 64b7310 to get more accurate results

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #17677      +/-   ##
============================================
+ Coverage     47.23%   47.34%   +0.10%     
+ Complexity    10430     9417    -1013     
============================================
  Files           692      626      -66     
  Lines         67766    59218    -8548     
  Branches       7260     6152    -1108     
============================================
- Hits          32009    28035    -3974     
+ Misses        32192    28140    -4052     
+ Partials       3565     3043     -522     
Flag Coverage Δ
unittests 47.34% <6.40%> (+0.10%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...elayed/bucket/BookkeeperBucketSnapshotStorage.java 0.00% <0.00%> (ø)
...rg/apache/pulsar/broker/delayed/bucket/Bucket.java 0.00% <0.00%> (ø)
...yed/bucket/BucketSnapshotPersistenceException.java 0.00% <0.00%> (ø)
...d/bucket/BucketSnapshotSerializationException.java 0.00% <0.00%> (ø)
.../pulsar/broker/service/BrokerServiceException.java 47.27% <0.00%> (+4.68%) ⬆️
...ava/org/apache/pulsar/broker/service/Consumer.java 69.75% <100.00%> (-2.42%) ⬇️
.../transaction/buffer/metadata/AbortTxnMetadata.java 28.57% <0.00%> (-57.15%) ⬇️
...java/org/apache/pulsar/proxy/stats/TopicStats.java 58.82% <0.00%> (-41.18%) ⬇️
...ar/broker/loadbalance/impl/BundleSplitterTask.java 60.00% <0.00%> (-20.00%) ⬇️
...oker/service/nonpersistent/NonPersistentTopic.java 42.02% <0.00%> (-15.61%) ⬇️
... and 196 more

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Just left a minor comment.

List<SnapshotSegment> bucketSnapshotSegments,
String bucketKey) {
return createLedger(bucketKey)
.thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the entry add failed, do we need to clean up the created Ledger?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add clean up logical

Copy link
Member

@zymap zymap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

List<SnapshotSegment> bucketSnapshotSegments) {
List<CompletableFuture<Void>> addFutures = new ArrayList<>();
for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it matter if we add 1,2 successful but 3,4 failed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FutureUtil.waitForAll will make sure they will not be partially successful?

Copy link
Member

@mattisonchao mattisonchao Dec 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but some entries have already been added to this ledger. So we will throw this book away, right?

@@ -61,7 +61,11 @@ public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMet
.thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
.thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
.thenCompose(__ -> closeLedger(ledgerHandle))
.thenApply(__ -> ledgerHandle.getId()));
.thenApply(__ -> ledgerHandle.getId()).whenComplete((ledgerId, ex) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add the exception handing to the end of the chain. Do we need to check the exception types? If it just failed to close the Ledger, we don't need to remove the ledger or just the Ledger creation failed.

Or we can just add exception handling for addEntry() method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

@coderzc coderzc force-pushed the bucket_delayed_store branch from 8a5c875 to 64b7310 Compare December 19, 2022 09:15
@coderzc coderzc force-pushed the bucket_delayed_store branch from 64b7310 to 2e6a8de Compare December 19, 2022 09:17
Copy link
Member

@mattisonchao mattisonchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs ready-to-test type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants