Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-379: Enable the use of the classic implementation of Key_Shared / Shared with feature flag #23424

Merged
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
8f6e42d
Extract AbstractPersistentDispatcherMultipleConsumers class
lhotari Oct 8, 2024
af78832
Rename unused and wrong logger
lhotari Oct 8, 2024
8ed62e0
Add StickyKeyDispatcher interface
lhotari Oct 8, 2024
c30697f
Add implementations for Shared and Key_Shared from branch-3.3, ported…
lhotari Oct 9, 2024
a7aea94
Restore readPositionWhenJoining
lhotari Oct 9, 2024
36f89b2
Restore consumersAfterMarkDeletePosition
lhotari Oct 9, 2024
c5cefcd
Add method "isClassic()" that can be used to detect the implementatio…
lhotari Oct 9, 2024
ef1bae5
Add feature toggle
lhotari Oct 9, 2024
71add25
Rename duplicate KeySharedSubscriptioTest to KeySharedSubscriptionMax…
lhotari Oct 9, 2024
34c0277
Test both PIP-379 and classic in KeySharedSubscriptionMaxUnackedMessa…
lhotari Oct 9, 2024
09ab041
Test both PIP-379 and Classic in KeySharedSubscriptionTest
lhotari Oct 9, 2024
2d2524c
Fix bug where hash wasn't added to pending acks in classic implementa…
lhotari Oct 9, 2024
ff297ac
Revert "Fix bug where hash wasn't added to pending acks in classic im…
lhotari Oct 9, 2024
e3718a0
Don't require hash in replay with classic implementations
lhotari Oct 9, 2024
a6d7f8f
Use Integer.MAX_VALUE-1 as the range end in classic and consistent ha…
lhotari Oct 9, 2024
773917d
Make KeySharedSubscriptionTest run for both implementation types
lhotari Oct 9, 2024
e467c53
Refactor test parameterization solution
lhotari Oct 9, 2024
816db77
Revisit receiveAndCheckDistribution to use multiple threads for recei…
lhotari Oct 9, 2024
59448a6
Copy the unit test classes for the classic dispatchers
lhotari Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "Fix bug where hash wasn't added to pending acks in classic im…
…plementation"

This reverts commit 2d2524c.
  • Loading branch information
lhotari committed Oct 9, 2024
commit ff297ac49ed6632734d9ce300a5326c98cb859f5
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -246,18 +245,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
groupedEntries.clear();
final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = new HashMap<>();

for (int i = 0; i < entriesCount; i++) {
Entry inputEntry = entries.get(i);
EntryAndMetadata entry;
if (inputEntry instanceof EntryAndMetadata entryAndMetadataInstance) {
entry = entryAndMetadataInstance;
} else {
// replace the input entry with EntryAndMetadata instance. In addition to the entry and metadata,
// it will also carry the pre-calculated sticky key hash
entry = EntryAndMetadata.create(inputEntry,
Commands.peekAndCopyMessageMetadata(inputEntry.getDataBuffer(), getSubscriptionName(), -1));
entries.set(i, entry);
}
for (Entry entry : entries) {
int stickyKeyHash = getStickyKeyHash(entry);
Consumer c = selector.select(stickyKeyHash);
if (c != null) {
Expand Down