Skip to content

Commit

Permalink
feat: Merge optimized for N = 0 and N = 1
Browse files Browse the repository at this point in the history
  • Loading branch information
marksalpeter committed May 20, 2021
1 parent 9519bb9 commit fd3384c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
10 changes: 9 additions & 1 deletion merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ import "sync"

// Merge fans multiple channels in to a single channel
func Merge(ins ...<-chan interface{}) <-chan interface{} {
// Don't merge anything if we don't have to
if l := len(ins); l == 0 {
out := make(chan interface{})
close(out)
return out
} else if l == 1 {
return ins[0]
}
out := make(chan interface{})
// Create a WaitGroup that waits for all of the ins to close
var wg sync.WaitGroup
Expand All @@ -22,7 +30,7 @@ func Merge(ins ...<-chan interface{}) <-chan interface{} {
out <- i
}
}
// Tell the WaitGroup that one of the errChans is closed
// Tell the WaitGroup that one of the channels is closed
wg.Done()
}(ins[i])
}
Expand Down
17 changes: 17 additions & 0 deletions merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ func TestMerge(t *testing.T) {
waitFor: 500 * time.Millisecond,
errorCount: 1,
}},
}, {
description: "Single channel passes through",
expectedErrors: []error{
errors.New("[task a] error 0"),
errors.New("[task a] error 1"),
errors.New("[task a] error 2"),
},
tasks: []task{{
id: "a",
waitFor: 0,
// We shoud expect to 'never' receive this error, because it will emit after the maxTestDuration
errorCount: 3,
}},
}, {
description: "Closed channel returned",
expectedErrors: []error{},
tasks: []task{},
}} {
t.Run(test.description, func(t *testing.T) {
// Start doing all of the tasks
Expand Down

0 comments on commit fd3384c

Please sign in to comment.