Skip to content

Feat: Concurrent batch processing#795

Open
ukpratik wants to merge 5 commits intosamber:masterfrom
ukpratik:concurrentBatchProcessing
Open

Feat: Concurrent batch processing#795
ukpratik wants to merge 5 commits intosamber:masterfrom
ukpratik:concurrentBatchProcessing

Conversation

@ukpratik
Copy link

@ukpratik ukpratik commented Feb 8, 2026

Introducing new function BatchConcurrentProcess, this function:

  • Runs a processor function concurrently while capping concurrency with maxConcurrency.
  • Spawns goroutines for each item, using a semaphore to limit active workers.
  • Stops early if ctx is cancelled
  • Returns a channel (done) that closes when all processing finishes

This can be used for controlled fanout process, with storing the response from at the same index as the input data lies.
This protects system resources and improves throughput without chaos

@d-enk
Copy link
Contributor

d-enk commented Feb 9, 2026

  • This is not Batch processing
  • done chan is redundant - can simply wrap it with the Async0
  • processor should be func(ctx context.Context, index int, item T)
  • go func() for each element is overkill

That is, something like

func Process[T any](ctx context.Context, maxConcurrency int, collections []T, processor func(ctx context.Context, index int, item T)) {
	numInput := len(collections)

	if maxConcurrency <= 0 || maxConcurrency > numInput {
		maxConcurrency = numInput
	}

	var wg sync.WaitGroup
	var idx atomic.Int64

	wg.Add(maxConcurrency)

	for i := 0; i < maxConcurrency; i++ {
		go func() {
			defer wg.Done()
			i := int(idx.Add(1) - 1)

			for ; i < numInput; i = int(idx.Add(1) - 1) {
				select {
				case <-ctx.Done():
					return

				default:
					processor(ctx, i, collections[i])
				}
			}
		}()
	}

	wg.Wait()
}

But I probably wouldn't add it

@ukpratik
Copy link
Author

  • This is not Batch processing
    Understood, yes we should not call it Batch
  • done chan is redundant - can simply wrap it with the Async0
  • processor should be func(ctx context.Context, index int, item T)
  • go func() for each element is overkill

That is, something like

@d-enk thanks for the feedback
btw this wont be async completely, which was my initial motive, although we can further modify it,
I liked how you did used loop for i < maxConcurrency and used i := int(idx.Add(1) - 1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants