Open
Description
Search before asking
- I searched in the issues and found nothing similar.
Version
The latest master branch code
Minimal reproduce step
This is a question I had after reading the pulsar source code. I’m not sure if it’s a real problem.
In function org.apache.pulsar.broker.service.ServerCnx#handleSend#line 1838,There is a logic to verify sequenceId, the code is as follows:
// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
} else {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
}
But he did not check the following logic (if the following situation occurs, an exception should be returned):
send.hasHighestSequenceId() && send.getSequenceId() > send.getHighestSequenceId()
What did you expect to see?
I hope to check the following logic (if the following situation occurs, an exception should be returned):
send.hasHighestSequenceId() && send.getSequenceId() > send.getHighestSequenceId()
What did you see instead?
I find org.apache.pulsar.broker.service.Producer#publishMessage
function makes a judgment of lowestSequenceId > highestSequenceId
:
public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker, Position position) {
if (lowestSequenceId > highestSequenceId) {
so org.apache.pulsar.broker.service.ServerCnx#handleSend
function can be changed to:
// Persist the message
if (send.hasHighestSequenceId()) {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
} else {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
}
Anything else?
no
Are you willing to submit a PR?
- I'm willing to submit a PR!