Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pip] PIP-386: Add resetIncludeHead in CommandSubscribe for startMessageIdInclusive implementation #23427

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[improve][pip] PIP-386: Add resetIncludeHead in CommandSubscribe for …
…startMessageIdInclusive implementation
  • Loading branch information
summeriiii committed Oct 17, 2024
commit 7fa9324865224e6c0efdf99655cca75dd6d401e4
118 changes: 118 additions & 0 deletions pip/pip-386.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# PIP-386: Add resetIncludeHead in CommandSubscribe for startMessageIdInclusive implementation

# Motivation

This pip is intended to fix issue https://github.com/apache/pulsar/issues/23239.

In the previous implementation of the method startMessageIdInclusive (https://github.com/apache/pulsar/pull/4331),
we added startMessageIdInclusive() to support include current position of reset on ReaderBuilder.

However, the condition `if (((BatchMessageIdImpl) msgId).getBatchIndex() >= 0)` in PersistentTopic#getNonDurableSubscription was directly removed.
When we use the NonDurableSubscription, this caused the entryId to decrease by 1 for non-batch messages,
resulting in wrong msgBackLog after topic unload for non-durable subscription.

# Goals

Add resetIncludeHead in CommandSubscribe to implement startMessageIdInclusive, and fix the NonDurable Subscription msgBackLog incorrect after topic unload

# High Level Design

# Detailed Design

## Design & Implementation Details

- CommandSubscribe add the field **resetIncludeHead**, when use the ConsumerBuilder#startMessageIdInclusive or ReaderBuilder#startMessageIdInclusive this param is true, otherwise it is false.
- PersistTopic#getNonDurableSubscription add the judge condition `(msgId.getBatchIndex() >= 0 || resetIncludeHead)`, entryId - 1 will execute **when msg is batch or the resetIncludeHead is true.**


```java
if (ledgerId >= 0 && entryId >= 0
&& msgId instanceof BatchMessageIdImpl
&& (msgId.getBatchIndex() >= 0 || resetIncludeHead)) {
// When the start message is relative to a batch, we need to take one step back on the previous
// message,
// because the "batch" might not have been consumed in its entirety.
// The client will then be able to discard the first messages if needed.
entryId = msgId.getEntryId() - 1;
}
```


### Binary protocol

Add `reset_include_head` field to the `CommandSubscribe`.

```protobuf
PulsarApi.proto

message CommandSubscribe {
enum SubType {
Exclusive = 0;
Shared = 1;
Failover = 2;
Key_Shared = 3;
}
required string topic = 1;
required string subscription = 2;
required SubType subType = 3;

required uint64 consumer_id = 4;
required uint64 request_id = 5;
optional string consumer_name = 6;
optional int32 priority_level = 7;

// Signal wether the subscription should be backed by a
// durable cursor or not
optional bool durable = 8 [default = true];

// If specified, the subscription will position the cursor
// markd-delete position on the particular message id and
// will send messages from that point
optional MessageIdData start_message_id = 9;

/// Add optional metadata key=value to this consumer
repeated KeyValue metadata = 10;

optional bool read_compacted = 11;

optional Schema schema = 12;
enum InitialPosition {
Latest = 0;
Earliest = 1;
}
// Signal whether the subscription will initialize on latest
// or not -- earliest
optional InitialPosition initialPosition = 13 [default = Latest];

// Mark the subscription as "replicated". Pulsar will make sure
// to periodically sync the state of replicated subscriptions
// across different clusters (when using geo-replication).
optional bool replicate_subscription_state = 14;

// If true, the subscribe operation will cause a topic to be
// created if it does not exist already (and if topic auto-creation
// is allowed by broker.
// If false, the subscribe operation will fail if the topic
// does not exist.
optional bool force_topic_creation = 15 [default = true];

// If specified, the subscription will reset cursor's position back
// to specified seconds and will send messages from that point
optional uint64 start_message_rollback_duration_sec = 16 [default = 0];

optional KeySharedMeta keySharedMeta = 17;

repeated KeyValue subscription_properties = 18;

// The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
optional uint64 consumer_epoch = 19;

optional bool reset_include_head = 20 [default = false];
}
```


# Links

* Mailing List discussion thread:
* Mailing List voting thread: