Skip to content

[Bug] When sending a message, broker don't verifies the logic of recordSequenceId > highestSequenceId. #21886

Open
@luky116

Description

Search before asking

  • I searched in the issues and found nothing similar.

Version

The latest master branch code

Minimal reproduce step

This is a question I had after reading the pulsar source code. I’m not sure if it’s a real problem.

In function org.apache.pulsar.broker.service.ServerCnx#handleSend#line 1838,There is a logic to verify sequenceId, the code is as follows:

// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
    producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
            headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
} else {
    producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
            send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
}

But he did not check the following logic (if the following situation occurs, an exception should be returned):

send.hasHighestSequenceId() && send.getSequenceId() > send.getHighestSequenceId()

What did you expect to see?

I hope to check the following logic (if the following situation occurs, an exception should be returned):

send.hasHighestSequenceId() && send.getSequenceId() > send.getHighestSequenceId()

What did you see instead?

I find org.apache.pulsar.broker.service.Producer#publishMessage function makes a judgment of lowestSequenceId > highestSequenceId

public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
            ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker, Position position) {
        if (lowestSequenceId > highestSequenceId) {

so org.apache.pulsar.broker.service.ServerCnx#handleSend function can be changed to:

// Persist the message
if (send.hasHighestSequenceId()) {
    producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
            headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
} else {
    producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
            send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
}

Anything else?

no

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions