-
Notifications
You must be signed in to change notification settings - Fork 820
Add method to fetch messages in batch #1390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
b770811
to
9938849
Compare
9938849
to
1db836a
Compare
i++ | ||
} | ||
return msgBatch, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplication of the code can be avoided by calling this method in FetchMessage
. I'll refactor if once the approach gets reviewed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the number of messages doesn't reach the desired batchSize?
you changed the offset when the batch is processed, what happens if one of the messages in the batch fails? Is there any mechanism in place to handle that? Do you have any ideas for a fallback strategy for this?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ghaninia for batch processing possible use manuall ack, maybe?
If one of message failed, we can ack all messages before failed, except message with problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if one of the messages in the batch fails
We should let consumer decide on how they want to handle it. They can commit or read again from the last commit. Let me know if there is a better approach to handle this.
if one of message failed, we can ack all messages before failed
This sounds good, we can do this. My only concern if consumer is processing batch by batch, it might confusing behavior that a part of the batch committed it's neither abort nor fully committed.
What happens if the number of messages doesn't reach the desired batchSize?
I see, if the current code changes look okay, I can add a ticker with a timeout for a maximum wait time. So it would be returning if there are some messages available but not the full batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@krsoninikhil in my opinion
This sounds good, we can do this. My only concern if consumer is processing batch by batch, it might confusing behavior that a part of the batch committed it's neither abort nor fully committed.
It's absolute normal behavior. So if some process can't handle message correctly then should raise panic / return error / stop consuming messages and ack last successfully processed message. Whats next? Restart consumer? Raise panic? It's based on developer decision.
So we can imagine next situation - we receive 3 of 10 (batchSize) messages, we are not exceed deadline, we successfully handle first 2 messages, but failed on 3th message. We know, we can't handler 3 message so we can ack 1 and 2. In next attempt we receive 3,4,5... etc messages and can try again.
I see, if the current code changes look okay, I can add a ticker with a timeout for a maximum wait time. So it would be returning if there are some messages available but not the full batch.
also sounds very good, because all "batch processing" it's compromise between timeout and batchSize
Sorry, english is not my first language.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, my simple batch consumer looks like:
package kafkamux
import (
"context"
"errors"
"sync"
"time"
"github.com/rs/zerolog/log"
"github.com/segmentio/kafka-go"
)
var (
ErrSmallQueueCapacity = errors.New("batch lower than queue capacity")
)
func NewBatchConsumer(
reader Reader,
batchSize int,
duration time.Duration,
) (*BatchConsumer, error) {
if r, ok := reader.(*kafka.Reader); ok && r.Config().QueueCapacity < batchSize {
return nil, ErrSmallQueueCapacity
}
return &BatchConsumer{
reader: reader,
batchSize: batchSize,
messages: make([]kafka.Message, 0, batchSize),
duration: duration,
}, nil
}
type BatchConsumer struct {
reader Reader
batchSize int
duration time.Duration
l sync.Mutex
messages []kafka.Message
}
func (b *BatchConsumer) flush(ctx context.Context, fn handler.BatchCallback) error {
l := log.Ctx(ctx)
b.l.Lock()
defer b.l.Unlock()
if len(b.messages) == 0 {
return nil
}
if err := fn(ctx, b.messages); err != nil {
l.Err(err).Msg("error in callback")
return werr.Wrap(err)
}
if err := b.reader.CommitMessages(ctx, b.messages...); err != nil {
l.Err(err).Msg("error in commit messages")
return werr.Wrap(err)
}
b.messages = make([]kafka.Message, 0, b.batchSize)
return nil
}
func (b *BatchConsumer) Listen(ctx context.Context, fn handler.BatchCallback) error {
l := log.Ctx(ctx)
errCh := make(chan error, 1)
msgCh := make(chan kafka.Message, b.batchSize)
ticker := time.NewTicker(b.duration)
defer ticker.Stop()
go func() {
defer close(msgCh)
for {
msg, err := fetchMessage(ctx, b.reader)
if err != nil {
errCh <- err
return
}
msgCh <- msg
}
}()
for {
select {
case readErr := <-errCh:
l.Err(readErr).Msg("read message error, stop main loop")
return nil
case <-ctx.Done():
l.Debug().Msg("context done, stop main loop")
return nil
case <-ticker.C:
l.Debug().Int("messages_count", len(b.messages)).Msg("ticker flush")
if err := b.flush(ctx, fn); err != nil {
l.Err(err).Msg("error flushing messages")
return werr.Wrap(err)
}
case msg, ok := <-msgCh:
if !ok {
continue
}
b.messages = append(b.messages, msg)
if len(b.messages) < b.batchSize {
l.Debug().Int("messages_count", len(b.messages)).Msg("not enough messages, wait")
continue
}
l.Info().Int("messages_count", len(b.messages)).Msg("main loop flush")
if err := b.flush(ctx, fn); err != nil {
l.Err(err).Msg("error flushing messages")
return werr.Wrap(err)
}
ticker.Reset(b.duration)
}
}
}
this ^ consumer or ack all messages or do nothing, because error happened. I'am not sure my solution is correct in generally, but for my project with idempotency it's okay.
So if we can have ability for fetch messages with batch Size, it can help in many situations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@krsoninikhil please check MR #1395 with deadline timeout support.
Since FetchMessage is already reading messages from a fetched batch, this new method just hold the messages util the batchSize number of messages are read. Fixes segmentio#123
1db836a
to
17cc32f
Compare
Since FetchMessage is already reading messages from a fetched batch, this new method just hold the messages util the batchSize number of messages are read.
Fixes #123