-
Notifications
You must be signed in to change notification settings - Fork 637
Description
Hi team, I would like to propose a customizable worker queue size/multiplier field for the esutil.BulkIndexer. This would require adding a new field to the indexer configuration, and piping this field through to the queue instantiation.
I am open to aiding the implementation if this aligns with the overall project's goals.
Proposal
Adding a new configuration field for queue size or queue multiplier in the BulkIndexerConfig.
go-elasticsearch/esutil/bulk_indexer.go
Lines 56 to 67 in 6000e55
| type BulkIndexerConfig struct { | |
| NumWorkers int // The number of workers. Defaults to runtime.NumCPU(). | |
| FlushBytes int // The flush threshold in bytes. Defaults to 5MB. | |
| FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec. | |
| Client esapi.Transport // The Elasticsearch client. | |
| Decoder BulkResponseJSONDecoder // A custom JSON decoder. | |
| DebugLogger BulkIndexerDebugLogger // An optional logger for debugging. | |
| OnError func(context.Context, error) // Called for indexer errors. | |
| OnFlushStart func(context.Context) context.Context // Called when the flush starts. | |
| OnFlushEnd func(context.Context) // Called when the flush ends. |
It would be nice to decouple the parallelism from the buffer size so they can be scaled separately through a fixed queue_size
bi.queue = make(chan BulkIndexerItem, bi.config.QueueSize)
Or allow for the buffer to scale non-linearly with the worker count such as though a queue_multiplier.
bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers*bi.config.QueueMultiplier)
go-elasticsearch/esutil/bulk_indexer.go
Lines 373 to 375 in 6000e55
| func (bi *bulkIndexer) init() { | |
| bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers) | |
Current Limitation
Currently, the bulk indexer hardcodes the worker shared input queue size equal to the number of workers. Meaning if there are 8 workers, the queue will only hold 8 items maximum. This is limiting for bursty traffic patterns which may require larger queue sizes, as these services would have to continuously scale worker count to implicitly scale the queue to add more buffer.