Skip to content

Commit

Permalink
Refine runPar execution
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 24, 2020
1 parent 2897d22 commit dd61310
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 40 deletions.
4 changes: 2 additions & 2 deletions assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ loop:
}
if checkHasItem, value := ass.itemToBeChecked(); checkHasItem {
length := len(got)
if length > 1 {
assert.Fail(t, "wrong number of items", "expected 1, got %d", length)
if length != 1 {
assert.FailNow(t, "wrong number of items", "expected 1, got %d", length)
}
assert.Equal(t, value, got[0])
}
Expand Down
4 changes: 2 additions & 2 deletions item.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func (i Item) SendBlocking(ch chan<- Item) {
ch <- i
}

// SendWithContext sends an item and blocks until it is sent or a context canceled.
// SendCtx sends an item and blocks until it is sent or a context canceled.
// It returns a boolean to indicate whether the item was sent.
func (i Item) SendWithContext(ctx context.Context, ch chan<- Item) bool {
func (i Item) SendCtx(ctx context.Context, ch chan<- Item) bool {
select {
case <-ctx.Done():
return false
Expand Down
2 changes: 1 addition & 1 deletion item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_Item_SendContext_True(t *testing.T) {
defer close(ch)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
assert.True(t, Of(5).SendWithContext(ctx, ch))
assert.True(t, Of(5).SendCtx(ctx, ch))
}

func Test_Item_SendNonBlocking(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,14 @@ func runPar(ctx context.Context, next chan Item, iterable Iterable, operatorFact
wg := sync.WaitGroup{}
_, pool := option.getPool()
wg.Add(pool)
ctx, cancel := context.WithCancel(ctx)

var gather chan Item
if bypassGather {
gather = next
} else {
gather = make(chan Item, 1)

// Gather
go func() {
op := operatorFactory()
stopped := false
Expand Down Expand Up @@ -274,6 +274,7 @@ func runPar(ctx context.Context, next chan Item, iterable Iterable, operatorFact
}()
}

// Scatter
for i := 0; i < pool; i++ {
go func() {
op := operatorFactory()
Expand Down Expand Up @@ -312,7 +313,6 @@ func runPar(ctx context.Context, next chan Item, iterable Iterable, operatorFact

go func() {
wg.Wait()
cancel()
close(gather)
}()
}
Expand Down
66 changes: 33 additions & 33 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ type allOperator struct {
all bool
}

func (op *allOperator) next(_ context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
func (op *allOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
if !op.predicate(item.V) {
dst <- Of(false)
Of(false).SendCtx(ctx, dst)
op.all = false
operatorOptions.stop()
}
Expand All @@ -40,15 +40,15 @@ func (op *allOperator) err(ctx context.Context, item Item, dst chan<- Item, oper
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *allOperator) end(_ context.Context, dst chan<- Item) {
func (op *allOperator) end(ctx context.Context, dst chan<- Item) {
if op.all {
dst <- Of(true)
Of(true).SendCtx(ctx, dst)
}
}

func (op *allOperator) gatherNext(_ context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
func (op *allOperator) gatherNext(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
if item.V == false {
dst <- Of(false)
Of(false).SendCtx(ctx, dst)
op.all = false
operatorOptions.stop()
}
Expand All @@ -66,10 +66,10 @@ type averageFloat32Operator struct {
count float32
}

func (op *averageFloat32Operator) next(_ context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
func (op *averageFloat32Operator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
switch v := item.V.(type) {
default:
dst <- Error(IllegalInputError{error: fmt.Sprintf("expected type: float or int, got: %t", item)})
Error(IllegalInputError{error: fmt.Sprintf("expected type: float or int, got: %t", item)}).SendCtx(ctx, dst)
operatorOptions.stop()
case int:
op.sum += float32(v)
Expand All @@ -87,11 +87,11 @@ func (op *averageFloat32Operator) err(ctx context.Context, item Item, dst chan<-
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *averageFloat32Operator) end(_ context.Context, dst chan<- Item) {
func (op *averageFloat32Operator) end(ctx context.Context, dst chan<- Item) {
if op.count == 0 {
dst <- Of(0)
Of(0).SendCtx(ctx, dst)
} else {
dst <- Of(op.sum / op.count)
Of(op.sum/op.count).SendCtx(ctx, dst)
}
}

Expand Down Expand Up @@ -462,12 +462,12 @@ func (o *ObservableImpl) BufferWithTime(timespan Duration, opts ...Option) Obser
case item, ok := <-observe:
if !ok {
if len(buffer) != 0 {
Of(buffer).SendWithContext(ctx, next)
Of(buffer).SendCtx(ctx, next)
}
return
}
if item.Error() {
item.SendWithContext(ctx, next)
item.SendCtx(ctx, next)
if option.getErrorStrategy() == Stop {
return
}
Expand All @@ -476,7 +476,7 @@ func (o *ObservableImpl) BufferWithTime(timespan Duration, opts ...Option) Obser
}
case <-time.After(timespan.duration()):
if len(buffer) != 0 {
if !Of(buffer).SendWithContext(ctx, next) {
if !Of(buffer).SendCtx(ctx, next) {
return
}
buffer = make([]interface{}, 0)
Expand Down Expand Up @@ -509,27 +509,27 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt
case item, ok := <-observe:
if !ok {
if len(buffer) != 0 {
Of(buffer).SendWithContext(ctx, next)
Of(buffer).SendCtx(ctx, next)
}
return
}
if item.Error() {
item.SendWithContext(ctx, next)
item.SendCtx(ctx, next)
if option.getErrorStrategy() == Stop {
return
}
} else {
buffer = append(buffer, item.V)
if len(buffer) == count {
if !Of(buffer).SendWithContext(ctx, next) {
if !Of(buffer).SendCtx(ctx, next) {
return
}
buffer = make([]interface{}, 0)
}
}
case <-time.After(timespan.duration()):
if len(buffer) != 0 {
if !Of(buffer).SendWithContext(ctx, next) {
if !Of(buffer).SendCtx(ctx, next) {
return
}
buffer = make([]interface{}, 0)
Expand Down Expand Up @@ -625,7 +625,7 @@ func (o *ObservableImpl) Debounce(timespan Duration, opts ...Option) Observable
return
}
if item.Error() {
if !item.SendWithContext(ctx, next) {
if !item.SendCtx(ctx, next) {
return
}
if option.getErrorStrategy() == Stop {
Expand All @@ -636,7 +636,7 @@ func (o *ObservableImpl) Debounce(timespan Duration, opts ...Option) Observable
}
case <-time.After(timespan.duration()):
if latest != nil {
if !Of(latest).SendWithContext(ctx, next) {
if !Of(latest).SendCtx(ctx, next) {
return
}
latest = nil
Expand Down Expand Up @@ -1045,12 +1045,12 @@ func (o *ObservableImpl) FlatMap(apply ItemToObservable, opts ...Option) Observa
break loop2
}
if item.Error() {
item.SendWithContext(ctx, next)
item.SendCtx(ctx, next)
if option.getErrorStrategy() == Stop {
return
}
} else {
if !item.SendWithContext(ctx, next) {
if !item.SendCtx(ctx, next) {
return
}
}
Expand Down Expand Up @@ -2319,15 +2319,15 @@ func (o *ObservableImpl) TimeInterval(opts ...Option) Observable {
return
}
if item.Error() {
if !item.SendWithContext(ctx, next) {
if !item.SendCtx(ctx, next) {
return
}
if option.getErrorStrategy() == Stop {
return
}
} else {
now := time.Now().UTC()
if !Of(now.Sub(latest)).SendWithContext(ctx, next) {
if !Of(now.Sub(latest)).SendCtx(ctx, next) {
return
}
latest = now
Expand Down Expand Up @@ -2582,7 +2582,7 @@ func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Obser
observe := o.Observe(opts...)
ch := option.buildChannel()
empty := true
if !Of(FromChannel(ch)).SendWithContext(ctx, next) {
if !Of(FromChannel(ch)).SendCtx(ctx, next) {
return
}

Expand All @@ -2597,15 +2597,15 @@ func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Obser
return
}
if item.Error() {
if !item.SendWithContext(ctx, ch) {
if !item.SendCtx(ctx, ch) {
return
}
if option.getErrorStrategy() == Stop {
close(ch)
return
}
}
if !item.SendWithContext(ctx, ch) {
if !item.SendCtx(ctx, ch) {
return
}
empty = false
Expand All @@ -2616,7 +2616,7 @@ func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Obser
close(ch)
ch = option.buildChannel()
empty = true
if !Of(FromChannel(ch)).SendWithContext(ctx, next) {
if !Of(FromChannel(ch)).SendCtx(ctx, next) {
return
}
}
Expand All @@ -2641,7 +2641,7 @@ func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opt
observe := o.Observe(opts...)
ch := option.buildChannel()
iCount := 0
if !Of(FromChannel(ch)).SendWithContext(ctx, next) {
if !Of(FromChannel(ch)).SendCtx(ctx, next) {
return
}

Expand All @@ -2656,23 +2656,23 @@ func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opt
return
}
if item.Error() {
if !item.SendWithContext(ctx, ch) {
if !item.SendCtx(ctx, ch) {
return
}
if option.getErrorStrategy() == Stop {
close(ch)
return
}
}
if !item.SendWithContext(ctx, ch) {
if !item.SendCtx(ctx, ch) {
return
}
iCount++
if iCount == count {
close(ch)
ch = option.buildChannel()
iCount = 0
if !Of(FromChannel(ch)).SendWithContext(ctx, next) {
if !Of(FromChannel(ch)).SendCtx(ctx, next) {
return
}
}
Expand All @@ -2683,7 +2683,7 @@ func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opt
close(ch)
ch = option.buildChannel()
iCount = 0
if !Of(FromChannel(ch)).SendWithContext(ctx, next) {
if !Of(FromChannel(ch)).SendCtx(ctx, next) {
return
}
}
Expand Down

0 comments on commit dd61310

Please sign in to comment.