Skip to content

Commit

Permalink
Add some stream features (zeromicro#712)
Browse files Browse the repository at this point in the history
* Add some stream features

* Update empty

* Fix initialization loop

* Delete ForeachOrdered && Fix FindFirst

* Add test case && Delete redundant code

* Update test case

* Delete SplitSteam

* Delete redundant code
  • Loading branch information
chenquan authored Jun 10, 2021
1 parent 5d86cc2 commit 14141fe
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 41 deletions.
107 changes: 107 additions & 0 deletions core/fx/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ type (
}
)

// empty a empty Stream.
var empty Stream

func init() {
// initial empty Stream.
source := make(chan interface{})
close(source)
empty.source = source
}

// Empty Returns a empty stream.
func Empty() Stream {
return empty
}

// From constructs a Stream from the given GenerateFunc.
func From(generate GenerateFunc) Stream {
source := make(chan interface{})
Expand Down Expand Up @@ -79,6 +94,11 @@ func Range(source <-chan interface{}) Stream {
}
}

// Concat Returns a concat Stream.
func Concat(a Stream, others ...Stream) Stream {
return a.Concat(others...)
}

// Buffer buffers the items into a queue with size n.
// It can balance the producer and the consumer if their processing throughput don't match.
func (p Stream) Buffer(n int) Stream {
Expand Down Expand Up @@ -380,6 +400,93 @@ func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
return Range(pipe)
}

// AnyMach Returns whether any elements of this stream match the provided predicate.
// May not evaluate the predicate on all elements if not necessary for determining the result.
// If the stream is empty then false is returned and the predicate is not evaluated.
func (p Stream) AnyMach(f func(item interface{}) bool) (isFind bool) {
for item := range p.source {
if f(item) {
isFind = true
return
}
}
return
}

// AllMach Returns whether all elements of this stream match the provided predicate.
// May not evaluate the predicate on all elements if not necessary for determining the result.
// If the stream is empty then true is returned and the predicate is not evaluated.
func (p Stream) AllMach(f func(item interface{}) bool) (isFind bool) {
isFind = true
for item := range p.source {
if !f(item) {
isFind = false
return
}
}
return
}

// Concat Returns a Stream that concat others streams
func (p Stream) Concat(others ...Stream) Stream {
source := make(chan interface{})
wg := sync.WaitGroup{}
for _, other := range others {
if p == other {
continue
}
wg.Add(1)
go func(iother Stream) {
for item := range iother.source {
source <- item
}
wg.Done()
}(other)

}

wg.Add(1)
go func() {
for item := range p.source {
source <- item
}
wg.Done()
}()
go func() {
wg.Wait()
close(source)
}()
return Range(source)
}

// Skip Returns a Stream that skips size elements.
func (p Stream) Skip(size int64) Stream {
if size == 0 {
return p
}
if size < 0 {
panic("size must be greater than -1")
}
source := make(chan interface{})

go func() {
i := 0
for item := range p.source {
if i >= int(size) {
source <- item
}
i++
}
close(source)
}()
return Range(source)
}

// Chan Returns a channel of Stream.
func (p Stream) Chan() <-chan interface{} {
return p.source
}

// UnlimitedWorkers lets the caller to use as many workers as the tasks.
func UnlimitedWorkers() Option {
return func(opts *rxOptions) {
Expand Down
143 changes: 137 additions & 6 deletions core/fx/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package fx
import (
"io/ioutil"
"log"
"math/rand"
"reflect"
"runtime"
"sort"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -330,6 +333,31 @@ func TestWalk(t *testing.T) {
assert.Equal(t, 9, result)
}

func BenchmarkParallelMapReduce(b *testing.B) {
b.ReportAllocs()

mapper := func(v interface{}) interface{} {
return v.(int64) * v.(int64)
}
reducer := func(input <-chan interface{}) (interface{}, error) {
var result int64
for v := range input {
result += v.(int64)
}
return result, nil
}
b.ResetTimer()
From(func(input chan<- interface{}) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
input <- int64(rand.Int())
}
})

}).Map(mapper).Reduce(reducer)

}

func BenchmarkMapReduce(b *testing.B) {
b.ReportAllocs()

Expand All @@ -343,12 +371,115 @@ func BenchmarkMapReduce(b *testing.B) {
}
return result, nil
}
b.ResetTimer()
From(func(input chan<- interface{}) {
for i := 0; i < b.N; i++ {
input <- int64(rand.Int())
}
}).Map(mapper).Reduce(reducer)

for i := 0; i < b.N; i++ {
From(func(input chan<- interface{}) {
for j := 0; j < 2; j++ {
input <- int64(j)
}
}).Map(mapper).Reduce(reducer)
}

func equal(t *testing.T, stream Stream, data []interface{}) {
items := make([]interface{}, 0)
for item := range stream.source {
items = append(items, item)
}
if !reflect.DeepEqual(items, data) {
t.Errorf(" %v, want %v", items, data)
}
}
func assetEqual(t *testing.T, except interface{}, data interface{}) {
if !reflect.DeepEqual(except, data) {
t.Errorf(" %v, want %v", data, except)
}

}
func TestStream_AnyMach(t *testing.T) {
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return 4 == item.(int)
}))
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return 0 == item.(int)
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return 2 == item.(int)
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return 2 == item.(int)
}))
}
func TestStream_AllMach(t *testing.T) {
assetEqual(
t, true, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return true
}),
)
assetEqual(
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return false
}),
)
assetEqual(
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return item.(int) == 1
}),
)
}
func TestEmpty(t *testing.T) {
empty := Empty()
assetEqual(t, len(empty.source), 0)
assetEqual(t, cap(empty.source), 0)
}
func TestConcat(t *testing.T) {
a1 := []interface{}{1, 2, 3}
a2 := []interface{}{4, 5, 6}
s1 := Just(a1...)
s2 := Just(a2...)
stream := Concat(s1, s2)
var items []interface{}
for item := range stream.source {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
return items[i].(int) < items[j].(int)
})
ints := make([]interface{}, 0)
ints = append(ints, a1...)
ints = append(ints, a2...)
assetEqual(t, ints, items)

}
func TestStream_Chan(t *testing.T) {
var items []interface{}

for item := range Just(1, 2, 3).Chan() {
items = append(items, item)
}
assetEqual(t, items, []interface{}{1, 2, 3})
}

func TestStream_Skip(t *testing.T) {
assetEqual(t, 3, Just(1, 2, 3, 4).Skip(1).Count())
assetEqual(t, 1, Just(1, 2, 3, 4).Skip(3).Count())
assetEqual(t, 4, Just(1, 2, 3, 4).Skip(0).Count())
equal(t, Just(1, 2, 3, 4).Skip(3), []interface{}{4})
assert.Panics(t, func() {
Just(1, 2, 3, 4).Skip(-1)
})
}

func TestStream_Concat(t *testing.T) {
stream := Just(1).Concat(Just(2), Just(3))
var items []interface{}
for item := range stream.source {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
return items[i].(int) < items[j].(int)
})
assetEqual(t, []interface{}{1, 2, 3}, items)

just := Just(1)
equal(t, just.Concat(just), []interface{}{1})
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
github.com/urfave/cli v1.22.5
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
github.com/zeromicro/antlr v0.0.1 // indirect
github.com/zeromicro/antlr v0.0.1
go.etcd.io/etcd v0.0.0-20200402134248-51bdeb39e698
go.uber.org/automaxprocs v1.3.0
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
Expand Down
Loading

0 comments on commit 14141fe

Please sign in to comment.