Skip to content

Add method to fetch messages in batch #1390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

krsoninikhil
Copy link

Since FetchMessage is already reading messages from a fetched batch, this new method just hold the messages util the batchSize number of messages are read.

Fixes #123

i++
}
return msgBatch, nil
}
Copy link
Author

@krsoninikhil krsoninikhil Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplication of the code can be avoided by calling this method in FetchMessage. I'll refactor if once the approach gets reviewed.

Copy link

@ghaninia ghaninia Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the number of messages doesn't reach the desired batchSize?

you changed the offset when the batch is processed, what happens if one of the messages in the batch fails? Is there any mechanism in place to handle that? Do you have any ideas for a fallback strategy for this?!

@krsoninikhil

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ghaninia for batch processing possible use manuall ack, maybe?

If one of message failed, we can ack all messages before failed, except message with problem.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if one of the messages in the batch fails

We should let consumer decide on how they want to handle it. They can commit or read again from the last commit. Let me know if there is a better approach to handle this.

if one of message failed, we can ack all messages before failed

This sounds good, we can do this. My only concern if consumer is processing batch by batch, it might confusing behavior that a part of the batch committed it's neither abort nor fully committed.

What happens if the number of messages doesn't reach the desired batchSize?

I see, if the current code changes look okay, I can add a ticker with a timeout for a maximum wait time. So it would be returning if there are some messages available but not the full batch.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@max107 @ghaninia let me know your thoughts.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krsoninikhil in my opinion

This sounds good, we can do this. My only concern if consumer is processing batch by batch, it might confusing behavior that a part of the batch committed it's neither abort nor fully committed.

It's absolute normal behavior. So if some process can't handle message correctly then should raise panic / return error / stop consuming messages and ack last successfully processed message. Whats next? Restart consumer? Raise panic? It's based on developer decision.

So we can imagine next situation - we receive 3 of 10 (batchSize) messages, we are not exceed deadline, we successfully handle first 2 messages, but failed on 3th message. We know, we can't handler 3 message so we can ack 1 and 2. In next attempt we receive 3,4,5... etc messages and can try again.

I see, if the current code changes look okay, I can add a ticker with a timeout for a maximum wait time. So it would be returning if there are some messages available but not the full batch.

also sounds very good, because all "batch processing" it's compromise between timeout and batchSize

Sorry, english is not my first language.

Copy link

@max107 max107 Jul 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, my simple batch consumer looks like:

package kafkamux

import (
	"context"
	"errors"
	"sync"
	"time"

	"github.com/rs/zerolog/log"
	"github.com/segmentio/kafka-go"
)

var (
	ErrSmallQueueCapacity = errors.New("batch lower than queue capacity")
)

func NewBatchConsumer(
	reader Reader,
	batchSize int,
	duration time.Duration,
) (*BatchConsumer, error) {
	if r, ok := reader.(*kafka.Reader); ok && r.Config().QueueCapacity < batchSize {
		return nil, ErrSmallQueueCapacity
	}

	return &BatchConsumer{
		reader:    reader,
		batchSize: batchSize,
		messages:  make([]kafka.Message, 0, batchSize),
		duration:  duration,
	}, nil
}

type BatchConsumer struct {
	reader    Reader
	batchSize int
	duration  time.Duration
	l         sync.Mutex
	messages  []kafka.Message
}

func (b *BatchConsumer) flush(ctx context.Context, fn handler.BatchCallback) error {
	l := log.Ctx(ctx)

	b.l.Lock()
	defer b.l.Unlock()

	if len(b.messages) == 0 {
		return nil
	}

	if err := fn(ctx, b.messages); err != nil {
		l.Err(err).Msg("error in callback")
		return werr.Wrap(err)
	}

	if err := b.reader.CommitMessages(ctx, b.messages...); err != nil {
		l.Err(err).Msg("error in commit messages")
		return werr.Wrap(err)
	}

	b.messages = make([]kafka.Message, 0, b.batchSize)

	return nil
}

func (b *BatchConsumer) Listen(ctx context.Context, fn handler.BatchCallback) error {
	l := log.Ctx(ctx)

	errCh := make(chan error, 1)

	msgCh := make(chan kafka.Message, b.batchSize)

	ticker := time.NewTicker(b.duration)
	defer ticker.Stop()

	go func() {
		defer close(msgCh)

		for {
			msg, err := fetchMessage(ctx, b.reader)
			if err != nil {
				errCh <- err
				return
			}

			msgCh <- msg
		}
	}()

	for {
		select {
		case readErr := <-errCh:
			l.Err(readErr).Msg("read message error, stop main loop")
			return nil

		case <-ctx.Done():
			l.Debug().Msg("context done, stop main loop")
			return nil

		case <-ticker.C:
			l.Debug().Int("messages_count", len(b.messages)).Msg("ticker flush")
			if err := b.flush(ctx, fn); err != nil {
				l.Err(err).Msg("error flushing messages")
				return werr.Wrap(err)
			}

		case msg, ok := <-msgCh:
			if !ok {
				continue
			}

			b.messages = append(b.messages, msg)

			if len(b.messages) < b.batchSize {
				l.Debug().Int("messages_count", len(b.messages)).Msg("not enough messages, wait")
				continue
			}

			l.Info().Int("messages_count", len(b.messages)).Msg("main loop flush")
			if err := b.flush(ctx, fn); err != nil {
				l.Err(err).Msg("error flushing messages")
				return werr.Wrap(err)
			}

			ticker.Reset(b.duration)
		}
	}
}

this ^ consumer or ack all messages or do nothing, because error happened. I'am not sure my solution is correct in generally, but for my project with idempotency it's okay.

So if we can have ability for fetch messages with batch Size, it can help in many situations.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krsoninikhil please check MR #1395 with deadline timeout support.

Since FetchMessage is already reading messages from a fetched
batch, this new method just hold the messages util the batchSize
number of messages are read.

Fixes segmentio#123
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Way to get batch messages and commit if the batch is successful
3 participants