[fix][broker]Fix chunked messages will be filtered by duplicating#20948
Merged
liangyepianzhou merged 25 commits intoapache:masterfrom Aug 31, 2023
liangyepianzhou:deplicate_chunk
Merged
[fix][broker]Fix chunked messages will be filtered by duplicating#20948liangyepianzhou merged 25 commits intoapache:masterfrom liangyepianzhou:deplicate_chunk
liangyepianzhou merged 25 commits intoapache:masterfrom
liangyepianzhou:deplicate_chunk
Conversation
### Motivation
Chunked messages use the same metadata, so all the chunked messages in a single message use the same sequence Id. And it will be recorded as duplicated messages.
```
private long updateMessageMetadataSequenceId(final MessageMetadata msgMetadata) {
final long sequenceId;
if (!msgMetadata.hasSequenceId()) {
sequenceId = msgIdGenerator++;
msgMetadata.setSequenceId(sequenceId);
} else {
sequenceId = msgMetadata.getSequenceId();
}
return sequenceId;
}
```
### Modification
Use different sequence id for chunk message.
heesung-sohn
reviewed
Aug 7, 2023
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Show resolved
Hide resolved
heesung-sohn
reviewed
Aug 7, 2023
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
Contributor
|
@rdhabalia Please review this change. I think we have missed discussing the deduplication feature compatibility when introducing the chunking feature in the PIP(https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar). |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
Contributor
|
Does this pr require review? Have you raised a PIP for another solution? |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
BewareMyPower
requested changes
Aug 30, 2023
Contributor
BewareMyPower
left a comment
There was a problem hiding this comment.
Please create another test class like MessageChunkingDeduplicationTest for it for the following reasons:
MessageChunkingSharedTestwas added to test chunking with Shared subscriptions. It's bad to add unrelated tests into this class.- You won't need to set
conf.setBrokerDeduplicationEnabled(true)and restart the broker for each test method.
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
BewareMyPower
approved these changes
Aug 30, 2023
poorbarcode
approved these changes
Aug 30, 2023
codelipenghui
requested changes
Aug 30, 2023
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
codelipenghui
approved these changes
Aug 30, 2023
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Show resolved
Hide resolved
RobertIndie
approved these changes
Aug 31, 2023
heesung-sohn
approved these changes
Aug 31, 2023
liangyepianzhou
added a commit
that referenced
this pull request
Sep 4, 2023
…0948) Make the chunk message function work properly when deduplication is enabled. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, total chunk: 2 Chunk-2 sequence ID: 0, chunk ID: 1 Chunk-3 sequence ID: 1, chunk ID: 0 total chunk: 3 Chunk-4 sequence ID: 1, chunk ID: 1 Chunk-5 sequence ID: 1, chunk ID: 1 Chunk-6 sequence ID: 1, chunk ID: 2 ``` Only store check and store the sequence ID of Chunk-2 and Chunk-6. **Add a property in the publishContext to determine whether this chunk is the last chunk when persistent completely.** ```java publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE); ``` For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3 Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6 ``` We should filter and ack chunk-4 and chunk-5. (cherry picked from commit b0b13bc)
liangyepianzhou
added a commit
that referenced
this pull request
Sep 4, 2023
…0948) Make the chunk message function work properly when deduplication is enabled. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, total chunk: 2 Chunk-2 sequence ID: 0, chunk ID: 1 Chunk-3 sequence ID: 1, chunk ID: 0 total chunk: 3 Chunk-4 sequence ID: 1, chunk ID: 1 Chunk-5 sequence ID: 1, chunk ID: 1 Chunk-6 sequence ID: 1, chunk ID: 2 ``` Only store check and store the sequence ID of Chunk-2 and Chunk-6. **Add a property in the publishContext to determine whether this chunk is the last chunk when persistent completely.** ```java publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE); ``` For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3 Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6 ``` We should filter and ack chunk-4 and chunk-5. (cherry picked from commit b0b13bc)
Technoboy-
pushed a commit
that referenced
this pull request
Sep 5, 2023
…0948) ## Motivation Make the chunk message function work properly when deduplication is enabled. ## Modification ### Only check and store the sequence ID of the last chunk in a chunk message. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, total chunk: 2 Chunk-2 sequence ID: 0, chunk ID: 1 Chunk-3 sequence ID: 1, chunk ID: 0 total chunk: 3 Chunk-4 sequence ID: 1, chunk ID: 1 Chunk-5 sequence ID: 1, chunk ID: 1 Chunk-6 sequence ID: 1, chunk ID: 2 ``` Only store check and store the sequence ID of Chunk-2 and Chunk-6. **Add a property in the publishContext to determine whether this chunk is the last chunk when persistent completely.** ```java publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE); ``` ### Filter and ack duplicated chunks in a chunk message instead of discarding ctx. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3 Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6 ``` We should filter and ack chunk-4 and chunk-5.
Technoboy-
pushed a commit
that referenced
this pull request
Sep 5, 2023
…0948) ## Motivation Make the chunk message function work properly when deduplication is enabled. ## Modification ### Only check and store the sequence ID of the last chunk in a chunk message. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, total chunk: 2 Chunk-2 sequence ID: 0, chunk ID: 1 Chunk-3 sequence ID: 1, chunk ID: 0 total chunk: 3 Chunk-4 sequence ID: 1, chunk ID: 1 Chunk-5 sequence ID: 1, chunk ID: 1 Chunk-6 sequence ID: 1, chunk ID: 2 ``` Only store check and store the sequence ID of Chunk-2 and Chunk-6. **Add a property in the publishContext to determine whether this chunk is the last chunk when persistent completely.** ```java publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE); ``` ### Filter and ack duplicated chunks in a chunk message instead of discarding ctx. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3 Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6 ``` We should filter and ack chunk-4 and chunk-5.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Make the chunk message function work properly when deduplication is enabled.
Modification
Only check and store the sequence ID of the last chunk in a chunk message.
For example:
Only store check and store the sequence ID of Chunk-2 and Chunk-6.
Add a property in the publishContext to determine whether this chunk is the last chunk when persistent completely.
Filter and ack duplicated chunks in a chunk message instead of discarding ctx.
For example:
We should filter and ack chunk-4 and chunk-5.
Documentation
docdoc-requireddoc-not-neededdoc-complete