Skip to content

Commit

Permalink
fix: improved shutdown logic
Browse files Browse the repository at this point in the history
  • Loading branch information
marksalpeter committed Apr 7, 2021
1 parent 2a8d049 commit e04afef
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type Processor interface {
// Process processes an input and reurns an output
Process(ctx context.Context, i interface{}) (interface{}, error)

// Cancel is called if process returns an error
// Cancel is called if process returns an error or if the context is canceled
Cancel(i interface{}, err error)
}

Expand All @@ -18,12 +18,19 @@ func Process(ctx context.Context, p Processor, in <-chan interface{}) <-chan int
defer close(out)
// Start processing inputs until in closes
for i := range in {
result, err := p.Process(ctx, i)
if err != nil {
p.Cancel(i, err)
continue
select {
// When the context is canceled, Cancel all inputs
case <-ctx.Done():
p.Cancel(i, ctx.Err())
// Otherwise, Process all inputs
default:
result, err := p.Process(ctx, i)
if err != nil {
p.Cancel(i, err)
continue
}
out <- result
}
out <- result
}
}()
return out
Expand Down

0 comments on commit e04afef

Please sign in to comment.