Skip to content

Reader doesn't return truly last message on LatestMessageID() when consuming messages sent with SendAsync #1082

@ksargent1216

Description

@ksargent1216

Expected behavior

Background/Setup:

I have a Reader setup to retrieve the last message from a topic. When a producer produces messages using SendAsync, the message IDs are the same for a batch of messages (I don't the detail of how Pulsar keeps them unique under the hood). For example, the following few messages are part of a larger batch that all have the same Pulsar message ID:

Received message Pulsar messageID: 4103:12132:0, msgId: {999533 true}, content: '{2023-08-04 12:49:33.961 +0000 UTC true}'
Received message Pulsar messageID: 4103:12132:0, msgId: {999534 true}, content: '{2023-08-04 13:52:10.133 +0000 UTC true}'
Received message Pulsar messageID: 4103:12132:0, msgId: {999535 true}, content: '{2023-08-04 14:55:43.391 +0000 UTC true}'
Received message Pulsar messageID: 4103:12132:0, msgId: {999536 true}, content: '{2023-08-03 22:36:42.701 +0000 UTC true}'
(There could be many more, but for this example we'll pretend those four messages are the batch of IDs related to 4103:12132:0)

Note: the msgId field comes from our data and shows that the messages are unique and sequential in the actual data.

I'm using a Reader configured with the following options:

reader, err = client.CreateReader(pulsar.ReaderOptions{
	Topic:                   pulsarProducerTopicFullName,
	StartMessageID:          pulsar.LatestMessageID(),
	StartMessageIDInclusive: true,
})

What should happen:
I would expect the Reader to return the following message:
Received message Pulsar messageID: 4103:12132:0, msgId: {999536 true}, content: '{2023-08-03 22:36:42.701 +0000 UTC true}'

Actual behavior

What actually happens:
The Reader returns the first message in the last batch. For example, the Reader returns the following:
Received message Pulsar messageID: 4103:12132:0, msgId: {999533 true}, content: '{2023-08-04 12:49:33.961 +0000 UTC true}'

Steps to reproduce

Configure a Reader with the above options.
Configure a Producer with Batching (probably not strictly necessary since it'll default to a batch of 1000):
producer, err = client.CreateProducer(pulsar.ProducerOptions{
Topic: pulsarProducerTopicFullName,
BatchingMaxMessages: 10000,
})
Produce using SendAsync
producer.SendAsync(ctx, &msg, msgCallback)
Setup a subscriber that prints the data and the Pulsar Message ID for the received messages. Something like this:

type KeyMessageFields struct {
Id sql.NullInt32 json:"id"
FirstName sql.NullString json:"firstname"
LastName sql.NullString json:"lastname"
EventTime sql.NullTime json:"event_time"
}

func consumer() {

// Code to set up client goes here
*** set up client here ***

// Set up Consumer
channel := make(chan pulsar.ConsumerMessage, 100)

options := pulsar.ConsumerOptions{
	Topic:                       pulsarConsumerTopicFullName,
	SubscriptionName:            subscriptionName,
	Type:                        pulsar.Exclusive,
	SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
}

options.MessageChannel = channel

consumer, err := client.Subscribe(options)
if err != nil {
	ls.LogFatal("failure subscribing: ", logNameValue{"error": err.Error()})
}

defer consumer.Close()
var keyMessageFields KeyMessageFields

for cm := range channel {

	msg := cm.Message

	err = json.Unmarshal(msg.Payload(), &keyMessageFields)

	fmt.Printf("Received message Pulsar messageID: %v, msgId: %v, content: '%v'\n", msg.ID(), keyMessageFields.Id, keyMessageFields.EventTime)

	consumer.Ack(msg)
}

System configuration

Pulsar version: 2.10.3.7 (hosted on StreamNative)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions