-
Notifications
You must be signed in to change notification settings - Fork 369
Description
Expected behavior
Background/Setup:
I have a Reader setup to retrieve the last message from a topic. When a producer produces messages using SendAsync, the message IDs are the same for a batch of messages (I don't the detail of how Pulsar keeps them unique under the hood). For example, the following few messages are part of a larger batch that all have the same Pulsar message ID:
Received message Pulsar messageID: 4103:12132:0, msgId: {999533 true}, content: '{2023-08-04 12:49:33.961 +0000 UTC true}'
Received message Pulsar messageID: 4103:12132:0, msgId: {999534 true}, content: '{2023-08-04 13:52:10.133 +0000 UTC true}'
Received message Pulsar messageID: 4103:12132:0, msgId: {999535 true}, content: '{2023-08-04 14:55:43.391 +0000 UTC true}'
Received message Pulsar messageID: 4103:12132:0, msgId: {999536 true}, content: '{2023-08-03 22:36:42.701 +0000 UTC true}'
(There could be many more, but for this example we'll pretend those four messages are the batch of IDs related to 4103:12132:0)
Note: the msgId field comes from our data and shows that the messages are unique and sequential in the actual data.
I'm using a Reader configured with the following options:
reader, err = client.CreateReader(pulsar.ReaderOptions{
Topic: pulsarProducerTopicFullName,
StartMessageID: pulsar.LatestMessageID(),
StartMessageIDInclusive: true,
})
What should happen:
I would expect the Reader to return the following message:
Received message Pulsar messageID: 4103:12132:0, msgId: {999536 true}, content: '{2023-08-03 22:36:42.701 +0000 UTC true}'
Actual behavior
What actually happens:
The Reader returns the first message in the last batch. For example, the Reader returns the following:
Received message Pulsar messageID: 4103:12132:0, msgId: {999533 true}, content: '{2023-08-04 12:49:33.961 +0000 UTC true}'
Steps to reproduce
Configure a Reader with the above options.
Configure a Producer with Batching (probably not strictly necessary since it'll default to a batch of 1000):
producer, err = client.CreateProducer(pulsar.ProducerOptions{
Topic: pulsarProducerTopicFullName,
BatchingMaxMessages: 10000,
})
Produce using SendAsync
producer.SendAsync(ctx, &msg, msgCallback)
Setup a subscriber that prints the data and the Pulsar Message ID for the received messages. Something like this:
type KeyMessageFields struct {
Id sql.NullInt32 json:"id"
FirstName sql.NullString json:"firstname"
LastName sql.NullString json:"lastname"
EventTime sql.NullTime json:"event_time"
}
func consumer() {
// Code to set up client goes here
*** set up client here ***
// Set up Consumer
channel := make(chan pulsar.ConsumerMessage, 100)
options := pulsar.ConsumerOptions{
Topic: pulsarConsumerTopicFullName,
SubscriptionName: subscriptionName,
Type: pulsar.Exclusive,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
}
options.MessageChannel = channel
consumer, err := client.Subscribe(options)
if err != nil {
ls.LogFatal("failure subscribing: ", logNameValue{"error": err.Error()})
}
defer consumer.Close()
var keyMessageFields KeyMessageFields
for cm := range channel {
msg := cm.Message
err = json.Unmarshal(msg.Payload(), &keyMessageFields)
fmt.Printf("Received message Pulsar messageID: %v, msgId: %v, content: '%v'\n", msg.ID(), keyMessageFields.Id, keyMessageFields.EventTime)
consumer.Ack(msg)
}
System configuration
Pulsar version: 2.10.3.7 (hosted on StreamNative)