Skip to content

Commit

Permalink
fix: more efficient batch processing
Browse files Browse the repository at this point in the history
Improved memory usage for both batch processing and concurrent batch processing.
Added an example for concurrent batch processing.
  • Loading branch information
marksalpeter committed May 2, 2021
1 parent 1dcf5b3 commit 7a69d0d
Show file tree
Hide file tree
Showing 4 changed files with 430 additions and 38 deletions.
87 changes: 72 additions & 15 deletions process_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pipeline

import (
"context"
"sync"
"time"
)

Expand All @@ -16,19 +17,75 @@ func ProcessBatch(
processor Processor,
in <-chan interface{},
) <-chan interface{} {
// 3. Split the processed slices of inputs back out into individial outputs
return Split(
// 2. Process the slices of inputs
Process(
ctx,
processor,
// 1. Collect slices of inputs from in
Collect(
ctx,
maxSize,
maxDuration,
in,
),
),
)
out := make(chan interface{})
go func() {
processBatch(ctx, maxSize, maxDuration, processor, in, out)
close(out)
}()
return out
}

// ProcessBatchConcurrently fans the in channel out to multiple batch Processors running concurrently,
// then it fans the out channels of the batch Processors back into a single out chan
func ProcessBatchConcurrently(
ctx context.Context,
concurrently,
maxSize int,
maxDuration time.Duration,
processor Processor,
in <-chan interface{},
) <-chan interface{} {
// Create the out chan
out := make(chan interface{})
// Close the out chan after all of the Processors finish executing
var wg sync.WaitGroup
wg.Add(concurrently)
go func() {
wg.Wait()
close(out)
}()
// Perform Process concurrently times
for i := 0; i < concurrently; i++ {
go func() {
processBatch(ctx, maxSize, maxDuration, processor, in, out)
wg.Done()
}()
}
return out
}

func processBatch(
ctx context.Context,
maxSize int,
maxDuration time.Duration,
processor Processor,
in <-chan interface{},
out chan<- interface{},
) {
for {
// Collect interfaces for batch processing
is, open := collect(ctx, maxSize, maxDuration, in)
if is != nil {
select {
// Cancel all inputs during shutdown
case <-ctx.Done():
processor.Cancel(is, ctx.Err())
// Otherwise Process the inputs
default:
results, err := processor.Process(ctx, is)
if err != nil {
processor.Cancel(is, err)
continue
}
// Split the results back into interfaces
for _, result := range results.([]interface{}) {
out <- result
}
}
}
// In is closed
if !open {
return
}
}
}
29 changes: 29 additions & 0 deletions process_batch_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,32 @@ func ExampleProcessBatch() {
// error: could not multiply [5], context deadline exceeded
// error: could not multiply [6], context deadline exceeded
}

func ExampleProcessBatchConcurrently() {
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-9
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9)

// Wait 4 seconds to pass 2 numbers through the pipe
// * 2 concurrent Processors
p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, &processors.Waiter{
Duration: 4 * time.Second,
}, p)

// Finally, lets print the results and see what happened
for result := range p {
log.Printf("result: %d\n", result)
}

// Output
// result: 3
// result: 4
// result: 1
// result: 2
// error: could not process [5 6], process was canceled
// error: could not process [7 8], process was canceled
// error: could not process [9], context deadline exceeded
}
Loading

0 comments on commit 7a69d0d

Please sign in to comment.