Skip to content

Feature request: support customizable queue size for esutil.BulkIndexer #1027

@Stephanie0829

Description

@Stephanie0829

Hi team, I would like to propose a customizable worker queue size/multiplier field for the esutil.BulkIndexer. This would require adding a new field to the indexer configuration, and piping this field through to the queue instantiation.

I am open to aiding the implementation if this aligns with the overall project's goals.

Proposal

Adding a new configuration field for queue size or queue multiplier in the BulkIndexerConfig.

type BulkIndexerConfig struct {
NumWorkers int // The number of workers. Defaults to runtime.NumCPU().
FlushBytes int // The flush threshold in bytes. Defaults to 5MB.
FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec.
Client esapi.Transport // The Elasticsearch client.
Decoder BulkResponseJSONDecoder // A custom JSON decoder.
DebugLogger BulkIndexerDebugLogger // An optional logger for debugging.
OnError func(context.Context, error) // Called for indexer errors.
OnFlushStart func(context.Context) context.Context // Called when the flush starts.
OnFlushEnd func(context.Context) // Called when the flush ends.

It would be nice to decouple the parallelism from the buffer size so they can be scaled separately through a fixed queue_size
bi.queue = make(chan BulkIndexerItem, bi.config.QueueSize)
Or allow for the buffer to scale non-linearly with the worker count such as though a queue_multiplier.
bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers*bi.config.QueueMultiplier)

func (bi *bulkIndexer) init() {
bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers)

Current Limitation

Currently, the bulk indexer hardcodes the worker shared input queue size equal to the number of workers. Meaning if there are 8 workers, the queue will only hold 8 items maximum. This is limiting for bursty traffic patterns which may require larger queue sizes, as these services would have to continuously scale worker count to implicitly scale the queue to add more buffer.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions