-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Introduce continuous offset for pulsar #9039
Conversation
/pulsarbot run-failure-checks |
/** | ||
* Find position by sequenceId. | ||
* */ | ||
CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate); | |
CompletableFuture<Position> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
/** | ||
* Find position by sequenceId. | ||
* */ | ||
CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate); | |
CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate); |
Can we use the java.util.Predicate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
condition
of OpFindNewest
is com.google.common.base.Predicate
type, so the parameter type here should also be com.google.common.base.Predicate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend using java.util.Predicate in the interface. You can just write a wrapper to convert a java.util.Predicate to a guava Predicate.
@@ -550,6 +566,11 @@ public Position addEntry(byte[] data) throws InterruptedException, ManagedLedger | |||
return addEntry(data, 0, data.length); | |||
} | |||
|
|||
@Override | |||
public Position addEntry(byte[] data, int batchSize) throws InterruptedException, ManagedLedgerException { | |||
return addEntry(data, batchSize,0, data.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return addEntry(data, batchSize,0, data.length); | |
return addEntry(data, batchSize, 0, data.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
addOperation.failed( | ||
new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed.")); | ||
ReferenceCountUtil.release(addOperation.data); | ||
log.error("[{}] Failed to interceptor entry before add to bookie.", name, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.error("[{}] Failed to interceptor entry before add to bookie.", name, e); | |
log.error("[{}] Failed to intercept adding an entry to bookie.", name, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
/** | ||
* Interceptor for ManagedLedger. | ||
* */ | ||
public interface ManagedLedgerInterceptor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the interface annotations to the new interface? See https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* Interceptor for ManagedLedger. | ||
* */ | ||
public interface ManagedLedgerInterceptor { | ||
OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add javadoc to the methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
||
public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor { | ||
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class); | ||
private static final String OFFSET = "offset"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka offset
is not a very good term to be used for describing the index of a message within a log stream. Instead, I would suggest calling it index
or logIndex
similar to the term used in the Raft algorithm (https://cs.stackexchange.com/questions/97542/raft-algorithm-whats-the-meaning-of-concept-index).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modify offset to index
if (appendBrokerEntryMetadata(headersAndPayload, publishContext)) { | ||
ledger.asyncAddEntry(headersAndPayload, this, publishContext); | ||
} | ||
ledger.asyncAddEntry(headersAndPayload, this, publishContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is the batchSize
passed to asyncAddEntry
? I failed to see how did you do that in this pull request.
At the same time, I think batchSize
is not a good term. If I understand this correctly, it should be numberOfMessages
, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@aloyszhang thanks for your great work. Shall the changes be documented to the Pulsar docs? If so, could you please help add the docs accordingly? Then you can ping me to review, thanks |
### Motivation When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice. This bug might be introduced from #9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518 It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified. ### Modifications Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`.
### Motivation When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice. This bug might be introduced from #9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518 It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified. ### Modifications Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`. (cherry picked from commit 9d44617)
### Motivation When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice. This bug might be introduced from apache#9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518 It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified. ### Modifications Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`. (cherry picked from commit 9d44617)
### Motivation When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice. This bug might be introduced from apache#9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518 It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified. ### Modifications Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`.
Fixes #9038
Motivation
As described in PIP-70.
One of the use case for Broker entry metadata is providing continuous message sequence-Id for messages in one topic-partition which is useful for Protocol Hanlder like KOP.
This PR enable Pulsar to support continuous offset for message based on Broker entry metadata.
Modifications
Introduce a new field for broker entry metadta named
offset
;Introduce a new interceptor type
ManagedLedgerInterceptor
which intercept entry inManagedLedger
;Each partition will be assigned a
ManagedLedgerInterceptor
whenManagedLedger
created;Each Entry will be intercept for adding a monotone increasing offset in Broker entry metadata and the offet is added by batchSize of entry;
Support find position by a given offset.