Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve: use chan *message instead of chan []*message as queueCh #1283

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
improve: use chan *message instead of chan []*message as queueCh
  • Loading branch information
nodece committed Sep 12, 2024
commit 75ceb3bbfad0e492daba69db4a7818e6885a6869
88 changes: 39 additions & 49 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type partitionConsumer struct {

// the size of the queue channel for buffering messages
maxQueueSize int32
queueCh chan []*message
queueCh chan *message
startMessageID atomicMessageID
lastDequeuedMsg *trackingMessageID

Expand Down Expand Up @@ -328,7 +328,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
partitionIdx: int32(options.partitionIdx),
eventsCh: make(chan interface{}, 10),
maxQueueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
queueCh: make(chan *message, options.receiverQueueSize),
startMessageID: atomicMessageID{msgID: options.startMessageID},
connectedCh: make(chan struct{}),
messageCh: messageCh,
Expand Down Expand Up @@ -1051,37 +1051,33 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
return fmt.Errorf("discarding message on decryption error :%v", err)
case crypto.ConsumerCryptoFailureActionConsume:
pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
messages := []*message{
{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
key: msgMeta.GetPartitionKey(),
producerName: msgMeta.GetProducerName(),
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
topic: pc.topic,
msgID: newMessageID(
int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()),
pbMsgID.GetBatchIndex(),
pc.partitionIdx,
pbMsgID.GetBatchSize(),
),
payLoad: headersAndPayload.ReadableSlice(),
schema: pc.options.schema,
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
encryptionContext: createEncryptionContext(msgMeta),
orderingKey: string(msgMeta.OrderingKey),
},
}

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Inc()
pc.markScaleIfNeed()
}

pc.queueCh <- messages
pc.queueCh <- &message{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
key: msgMeta.GetPartitionKey(),
producerName: msgMeta.GetProducerName(),
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
topic: pc.topic,
msgID: newMessageID(
int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()),
pbMsgID.GetBatchIndex(),
pc.partitionIdx,
pbMsgID.GetBatchSize(),
),
payLoad: headersAndPayload.ReadableSlice(),
schema: pc.options.schema,
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
encryptionContext: createEncryptionContext(msgMeta),
orderingKey: string(msgMeta.OrderingKey),
}
return nil
}
}
Expand Down Expand Up @@ -1255,7 +1251,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
Message: msg,
})

messages = append(messages, msg)
pc.queueCh <- msg
bytesReceived += msg.size()
}

Expand All @@ -1269,8 +1265,6 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pc.availablePermits.add(skippedMessages)
}

// send messages to the dispatcher
pc.queueCh <- messages
return nil
}

Expand Down Expand Up @@ -1426,20 +1420,19 @@ func (pc *partitionConsumer) dispatcher() {
defer func() {
pc.log.Debug("exiting dispatch loop")
}()
var messages []*message
var queueMsg *message
for {
var queueCh chan []*message
var queueCh chan *message
var messageCh chan ConsumerMessage
var nextMessage ConsumerMessage
var nextMessageSize int

// are there more messages to send?
if len(messages) > 0 {
if queueMsg != nil {
nextMessage = ConsumerMessage{
Consumer: pc.parentConsumer,
Message: messages[0],
Message: queueMsg,
}
nextMessageSize = messages[0].size()
nextMessageSize = queueMsg.size()

if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
Expand All @@ -1451,7 +1444,7 @@ func (pc *partitionConsumer) dispatcher() {
}

pc.metrics.PrefetchedMessages.Dec()
pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
} else {
queueCh = pc.queueCh
}
Expand All @@ -1466,7 +1459,7 @@ func (pc *partitionConsumer) dispatcher() {
}
pc.log.Debug("dispatcher received connection event")

messages = nil
queueMsg = nil

// reset available permits
pc.availablePermits.reset()
Expand All @@ -1484,19 +1477,16 @@ func (pc *partitionConsumer) dispatcher() {
pc.log.WithError(err).Error("unable to send initial permits to broker")
}

case msgs, ok := <-queueCh:
case msg, ok := <-queueCh:
if !ok {
return
}
// we only read messages here after the consumer has processed all messages
// in the previous batch
messages = msgs

queueMsg = msg

// if the messageCh is nil or the messageCh is full this will not be selected
case messageCh <- nextMessage:
// allow this message to be garbage collected
messages[0] = nil
messages = messages[1:]
queueMsg = nil

pc.availablePermits.inc()

Expand All @@ -1519,14 +1509,14 @@ func (pc *partitionConsumer) dispatcher() {
if m == nil {
break
} else if nextMessageInQueue == nil {
nextMessageInQueue = toTrackingMessageID(m[0].msgID)
nextMessageInQueue = toTrackingMessageID(m.msgID)
}
if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Sub(int32(len(m)))
pc.incomingMessages.Sub(int32(1))
}
}

messages = nil
queueMsg = nil

clearQueueCb(nextMessageInQueue)
}
Expand Down
41 changes: 23 additions & 18 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func TestSingleMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
queueCh: make(chan *message, 1),
eventsCh: eventsCh,
compressionProviders: sync.Map{},
options: &partitionConsumerOpts{},
Expand All @@ -47,13 +47,12 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
}

// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.Nil(t, m.ID().(*trackingMessageID).tracker)
}
message := <-pc.queueCh
id := message.ID().(*trackingMessageID)
assert.Nil(t, id.tracker)

// ack the message id
pc.AckID(messages[0].msgID.(*trackingMessageID))
pc.AckID(id)

select {
case <-eventsCh:
Expand All @@ -69,7 +68,7 @@ func newTestMetrics() *internal.LeveledMetrics {
func TestBatchMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
queueCh: make(chan *message, 1),
eventsCh: eventsCh,
compressionProviders: sync.Map{},
options: &partitionConsumerOpts{},
Expand All @@ -86,13 +85,12 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
}

// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.Nil(t, m.ID().(*trackingMessageID).tracker)
}
message := <-pc.queueCh
id := message.ID().(*trackingMessageID)
assert.Nil(t, id.tracker)

// ack the message id
err := pc.AckID(messages[0].msgID.(*trackingMessageID))
err := pc.AckID(id)
assert.Nil(t, err)

select {
Expand All @@ -105,7 +103,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
func TestBatchMessageIDWithAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
queueCh: make(chan *message, 10),
eventsCh: eventsCh,
compressionProviders: sync.Map{},
options: &partitionConsumerOpts{},
Expand All @@ -122,14 +120,21 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
}

// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.NotNil(t, m.ID().(*trackingMessageID).tracker)
var messageIDs []*trackingMessageID
for i := 0; i < 10; i++ {
select {
case m := <-pc.queueCh:
id := m.ID().(*trackingMessageID)
assert.NotNil(t, id.tracker)
messageIDs = append(messageIDs, id)
default:
break
}
}

// ack all message ids except the last one
for i := 0; i < 9; i++ {
err := pc.AckID(messages[i].msgID.(*trackingMessageID))
err := pc.AckID(messageIDs[i])
assert.Nil(t, err)
}

Expand All @@ -140,7 +145,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
}

// ack last message
err := pc.AckID(messages[9].msgID.(*trackingMessageID))
err := pc.AckID(messageIDs[9])
assert.Nil(t, err)

select {
Expand Down
Loading