Skip to content

Commit

Permalink
WindowWithTimeOrCount operator
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 23, 2020
1 parent 630a692 commit 07714de
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 171 deletions.
50 changes: 24 additions & 26 deletions duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,23 @@ type duration struct {
d time.Duration
}

type testDuration struct {
fs []func()
func (d *duration) duration() time.Duration {
return d.d
}

// WithDuration is a duration option
func WithDuration(d time.Duration) Duration {
return &duration{
d: d,
}
}

var tick = struct{}{}

type causalityDuration struct {
fs []func()
}

func timeCausality(elems ...interface{}) (context.Context, Observable, Duration) {
ch := make(chan Item, 1)
fs := make([]func(), len(elems)+1)
Expand All @@ -35,27 +46,25 @@ func timeCausality(elems ...interface{}) (context.Context, Observable, Duration)
if elem == tick {
fs[i] = func() {}
} else {
fs[i] = func() {
ch <- Of(elem)
switch elem := elem.(type) {
default:
fs[i] = func() {
ch <- Of(elem)
}
case error:
fs[i] = func() {
ch <- Error(elem)
}
}
}
}
fs[len(elems)] = func() {
cancel()
}
return ctx, FromChannel(ch), &testDuration{fs: fs}
}

func (d *testDuration) append(fs ...func()) {
if d.fs == nil {
d.fs = make([]func(), 0)
}
for _, f := range fs {
d.fs = append(d.fs, f)
}
return ctx, FromChannel(ch), &causalityDuration{fs: fs}
}

func (d *testDuration) duration() time.Duration {
func (d *causalityDuration) duration() time.Duration {
d.fs[0]()
d.fs = d.fs[1:]
return 0
Expand All @@ -69,14 +78,3 @@ func (m *mockDuration) duration() time.Duration {
args := m.Called()
return args.Get(0).(time.Duration)
}

func (d *duration) duration() time.Duration {
return d.d
}

// WithDuration is a duration option
func WithDuration(d time.Duration) Duration {
return &duration{
d: d,
}
}
4 changes: 2 additions & 2 deletions item.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (i Item) SendBlocking(ch chan<- Item) {
ch <- i
}

// SendContext sends an item and blocks until it is sent or a context canceled.
// SendWithContext sends an item and blocks until it is sent or a context canceled.
// It returns a boolean to indicate whether the item was sent.
func (i Item) SendContext(ctx context.Context, ch chan<- Item) bool {
func (i Item) SendWithContext(ctx context.Context, ch chan<- Item) bool {
select {
case <-ctx.Done():
return false
Expand Down
2 changes: 1 addition & 1 deletion item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_Item_SendContext_True(t *testing.T) {
defer close(ch)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
assert.True(t, Of(5).SendContext(ctx, ch))
assert.True(t, Of(5).SendWithContext(ctx, ch))
}

func Test_Item_SendNonBlocking(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Observable interface {
Unmarshal(unmarshaller Unmarshaller, factory func() interface{}, opts ...Option) Observable
WindowWithCount(count int, opts ...Option) Observable
WindowWithTime(timespan Duration, opts ...Option) Observable
WindowWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable
ZipFromIterable(iterable Iterable, zipper Func2, opts ...Option) Observable
}

Expand Down
191 changes: 141 additions & 50 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,9 +481,8 @@ func (o *ObservableImpl) BufferWithTime(timespan Duration, opts ...Option) Obser
if item.Error() {
next <- item
return
} else {
buffer = append(buffer, item.V)
}
buffer = append(buffer, item.V)
case <-time.After(timespan.duration()):
if len(buffer) != 0 {
select {
Expand Down Expand Up @@ -542,16 +541,15 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt
if item.Error() {
next <- item
return
} else {
buffer = append(buffer, item.V)
if len(buffer) == count {
select {
case <-ctx.Done():
return
case next <- Of(buffer):
}
buffer = make([]interface{}, 0)
}
buffer = append(buffer, item.V)
if len(buffer) == count {
select {
case <-ctx.Done():
return
case next <- Of(buffer):
}
buffer = make([]interface{}, 0)
}
case <-time.After(timespan.duration()):
if len(buffer) != 0 {
Expand Down Expand Up @@ -2501,65 +2499,158 @@ func (op *windowWithCountOperator) end(_ context.Context, _ chan<- Item) {
func (op *windowWithCountOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) {
}

// WindowWithTime periodically subdivides items from an Observable into Observables based on timed windows
// and emit them rather than emitting the items one at a time.
func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Observable {
if timespan == nil {
return Thrown(IllegalInputError{error: "timespan must no be nil"})
}

//option := parseOptions(opts...)
f := func(ctx context.Context, next chan Item, option Option, opts ...Option) {
defer close(next)
observe := o.Observe(opts...)
ch := option.buildChannel()
empty := true
if !Of(FromChannel(ch)).SendWithContext(ctx, next) {
return
}

for {
select {
case <-ctx.Done():
close(ch)
return
case item, ok := <-observe:
if !ok {
close(ch)
return
}
if item.Error() {
if !item.SendWithContext(ctx, ch) {
return
}
if option.getErrorStrategy() == Stop {
close(ch)
return
}
}
if !item.SendWithContext(ctx, ch) {
return
}
empty = false
case <-time.After(timespan.duration()):
if empty {
continue
}
close(ch)
ch = option.buildChannel()
empty = true
if !Of(FromChannel(ch)).SendWithContext(ctx, next) {
return
}
}
}
}

option := parseOptions(opts...)

if option.isEagerObservation() {
next := option.buildChannel()
ctx := option.buildContext()
go f(ctx, next, option, opts...)
return &ObservableImpl{iterable: newChannelIterable(next)}
}

// TODO Handle eager observation
return &ObservableImpl{
iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item {
mergedOptions := append(opts, propagatedOptions...)
option := parseOptions(mergedOptions...)
next := option.buildChannel()
ctx := option.buildContext()
go f(ctx, next, option, mergedOptions...)
return next
}),
}
}

go func() {
defer close(next)
observe := o.Observe(mergedOptions...)
ch := option.buildChannel()
empty := true
select {
case <-ctx.Done():
func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable {
if timespan == nil {
return Thrown(IllegalInputError{error: "timespan must no be nil"})
}
if count < 0 {
return Thrown(IllegalInputError{error: "count must be positive or nil"})
}

f := func(ctx context.Context, next chan Item, option Option, opts ...Option) {
defer close(next)
observe := o.Observe(opts...)
ch := option.buildChannel()
iCount := 0
if !Of(FromChannel(ch)).SendWithContext(ctx, next) {
return
}

for {
select {
case <-ctx.Done():
close(ch)
return
case item, ok := <-observe:
if !ok {
close(ch)
return
case next <- Of(FromChannel(ch)):
}

for {
select {
case <-ctx.Done():
if item.Error() {
if !item.SendWithContext(ctx, ch) {
return
case item, ok := <-observe:
if !ok {
close(ch)
return
}
if item.Error() {
ch <- item
close(ch)
return
} else {
ch <- item
empty = false
}
case <-time.After(timespan.duration()):
if empty {
continue
}
}
if option.getErrorStrategy() == Stop {
close(ch)
ch = option.buildChannel()
empty = true
select {
case <-ctx.Done():
return
case next <- Of(FromChannel(ch)):
}
return
}
}
}()
if !item.SendWithContext(ctx, ch) {
return
}
iCount++
if iCount == count {
close(ch)
ch = option.buildChannel()
iCount = 0
if !Of(FromChannel(ch)).SendWithContext(ctx, next) {
return
}
}
case <-time.After(timespan.duration()):
if iCount == 0 {
continue
}
close(ch)
ch = option.buildChannel()
iCount = 0
if !Of(FromChannel(ch)).SendWithContext(ctx, next) {
return
}
}
}
}

option := parseOptions(opts...)

if option.isEagerObservation() {
next := option.buildChannel()
ctx := option.buildContext()
go f(ctx, next, option, opts...)
return &ObservableImpl{iterable: newChannelIterable(next)}
}

return &ObservableImpl{
iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item {
mergedOptions := append(opts, propagatedOptions...)
option := parseOptions(mergedOptions...)
next := option.buildChannel()
ctx := option.buildContext()
go f(ctx, next, option, mergedOptions...)
return next
}),
}
Expand Down
Loading

0 comments on commit 07714de

Please sign in to comment.