Skip to content

Commit

Permalink
Merge pull request #19 from azihsoyn/feature/stream-refactoring
Browse files Browse the repository at this point in the history
refactoring stream
  • Loading branch information
azihsoyn authored Nov 16, 2016
2 parents 1100b7c + 3fe98cd commit f69e3f7
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 247 deletions.
73 changes: 25 additions & 48 deletions distinct.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package gollection

import (
"fmt"
"reflect"
)
import "reflect"

func (g *gollection) Distinct() *gollection {
if g.err != nil {
Expand All @@ -18,7 +15,7 @@ func (g *gollection) Distinct() *gollection {

}

func (g *gollection) DistinctBy(f interface{}) *gollection {
func (g *gollection) DistinctBy(f /*func(v <T1>) <T2>*/ interface{}) *gollection {
if g.err != nil {
return &gollection{err: g.err}
}
Expand All @@ -31,23 +28,18 @@ func (g *gollection) DistinctBy(f interface{}) *gollection {
}

func (g *gollection) distinct() *gollection {
sv := reflect.ValueOf(g.slice)
if sv.Kind() != reflect.Slice {
return &gollection{
slice: nil,
err: fmt.Errorf("gollection.Distinct called with non-slice value of type %T", g.slice),
}
sv, err := g.validateSlice("Distinct")
if err != nil {
return &gollection{err: err}
}

ret := reflect.MakeSlice(sv.Type(), 0, sv.Len())
m := make(map[interface{}]bool)

for i := 0; i < sv.Len(); i++ {
v := sv.Index(i)
id := v.Interface()
if _, ok := m[id]; !ok {
if processDistinct(v.Interface(), m) {
ret = reflect.Append(ret, v)
m[id] = true
}
}

Expand All @@ -61,9 +53,9 @@ func (g *gollection) distinctStream() *gollection {
ch: make(chan interface{}),
}

var initialized bool
m := make(map[interface{}]bool)
go func() {
var initialized bool
m := make(map[interface{}]bool)
for {
select {
case v, ok := <-g.ch:
Expand All @@ -75,9 +67,8 @@ func (g *gollection) distinctStream() *gollection {
continue
}

if _, ok := m[v]; !ok {
if processDistinct(v, m) {
next.ch <- v
m[v] = true
}
} else {
close(next.ch)
Expand All @@ -92,21 +83,14 @@ func (g *gollection) distinctStream() *gollection {
}

func (g *gollection) distinctBy(f interface{}) *gollection {
sv := reflect.ValueOf(g.slice)
if sv.Kind() != reflect.Slice {
return &gollection{
slice: nil,
err: fmt.Errorf("gollection.DistinctBy called with non-slice value of type %T", g.slice),
}
sv, err := g.validateSlice("DistinctBy")
if err != nil {
return &gollection{err: err}
}

funcValue := reflect.ValueOf(f)
funcType := funcValue.Type()
if funcType.Kind() != reflect.Func || funcType.NumIn() != 1 || funcType.NumOut() != 1 {
return &gollection{
slice: nil,
err: fmt.Errorf("gollection.DistinctBy called with invalid func. required func(in <T>) out <T> but supplied %v", g.slice),
}
funcValue, funcType, err := g.validateDistinctByFunc(f)
if err != nil {
return &gollection{err: err}
}

resultSliceType := reflect.SliceOf(funcType.In(0))
Expand All @@ -115,10 +99,8 @@ func (g *gollection) distinctBy(f interface{}) *gollection {

for i := 0; i < sv.Len(); i++ {
v := sv.Index(i)
id := funcValue.Call([]reflect.Value{v})[0].Interface()
if _, ok := m[id]; !ok {
if processDistinctBy(funcValue, v, m) {
ret = reflect.Append(ret, v)
m[id] = true
}
}

Expand All @@ -132,18 +114,14 @@ func (g *gollection) distinctByStream(f interface{}) *gollection {
ch: make(chan interface{}),
}

funcValue := reflect.ValueOf(f)
funcType := funcValue.Type()
if funcType.Kind() != reflect.Func || funcType.NumIn() != 1 || funcType.NumOut() != 1 {
return &gollection{
slice: nil,
err: fmt.Errorf("gollection.DistinctBy called with invalid func. required func(in <T>) out <T> but supplied %v", g.slice),
}
fv, _, err := g.validateDistinctByFunc(f)
if err != nil {
return &gollection{err: err}
}

var initialized bool
m := make(map[interface{}]bool)
go func() {
go func(fv *reflect.Value) {
var initialized bool
m := make(map[interface{}]bool)
for {
select {
case v, ok := <-g.ch:
Expand All @@ -155,10 +133,8 @@ func (g *gollection) distinctByStream(f interface{}) *gollection {
continue
}

id := funcValue.Call([]reflect.Value{reflect.ValueOf(v)})[0].Interface()
if _, ok := m[id]; !ok {
if processDistinctBy(*fv, reflect.ValueOf(v), m) {
next.ch <- v
m[id] = true
}
} else {
close(next.ch)
Expand All @@ -168,6 +144,7 @@ func (g *gollection) distinctByStream(f interface{}) *gollection {
continue
}
}
}()
}(&fv)

return next
}
37 changes: 13 additions & 24 deletions filter.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package gollection

import (
"fmt"
"reflect"
)
import "reflect"

func (g *gollection) Filter(f interface{}) *gollection {
func (g *gollection) Filter(f /* func(v <T>) bool */ interface{}) *gollection {
if g.err != nil {
return &gollection{err: g.err}
}
Expand All @@ -17,29 +14,22 @@ func (g *gollection) Filter(f interface{}) *gollection {
}

func (g *gollection) filter(f interface{}) *gollection {
sv := reflect.ValueOf(g.slice)
if sv.Kind() != reflect.Slice {
return &gollection{
slice: nil,
err: fmt.Errorf("gollection.Filter called with non-slice value of type %T", g.slice),
}
sv, err := g.validateSlice("Filter")
if err != nil {
return &gollection{err: err}
}

funcValue := reflect.ValueOf(f)
funcType := funcValue.Type()
if funcType.Kind() != reflect.Func || funcType.NumIn() != 1 || funcType.NumOut() != 1 || funcType.Out(0).Kind() != reflect.Bool {
return &gollection{
slice: nil,
err: fmt.Errorf("gollection.Filter called with invalid func. required func(in <T>) bool but supplied %v", g.slice),
}
funcValue, funcType, err := g.validateFilterFunc(f)
if err != nil {
return &gollection{err: err}
}

resultSliceType := reflect.SliceOf(funcType.In(0))
ret := reflect.MakeSlice(resultSliceType, 0, sv.Len())

for i := 0; i < sv.Len(); i++ {
v := sv.Index(i)
if funcValue.Call([]reflect.Value{v})[0].Interface().(bool) {
if processFilter(funcValue, v) {
ret = reflect.Append(ret, v)
}
}
Expand All @@ -54,11 +44,10 @@ func (g *gollection) filterStream(f interface{}) *gollection {
ch: make(chan interface{}),
}

funcValue := reflect.ValueOf(f)
funcType := funcValue.Type()
if funcType.Kind() != reflect.Func || funcType.NumIn() != 1 || funcType.NumOut() != 1 || funcType.Out(0).Kind() != reflect.Bool {
funcValue, funcType, err := g.validateFilterFunc(f)
if err != nil {
return &gollection{
err: fmt.Errorf("gollection.Filter called with invalid func. required func(in <T>) bool but supplied %v", f),
err: err,
}
}

Expand All @@ -75,7 +64,7 @@ func (g *gollection) filterStream(f interface{}) *gollection {
continue
}

if funcValue.Call([]reflect.Value{reflect.ValueOf(v)})[0].Interface().(bool) {
if processFilter(funcValue, reflect.ValueOf(v)) {
next.ch <- v
}
} else {
Expand Down
43 changes: 14 additions & 29 deletions flat_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"reflect"
)

func (g *gollection) FlatMap(f interface{}) *gollection {
func (g *gollection) FlatMap(f /*func(v <T1>) <T2> */ interface{}) *gollection {
if g.err != nil {
return &gollection{err: g.err}
}
Expand All @@ -18,29 +18,18 @@ func (g *gollection) FlatMap(f interface{}) *gollection {
}

func (g *gollection) flatMap(f interface{}) *gollection {
sv := reflect.ValueOf(g.slice)
if sv.Kind() != reflect.Slice {
return &gollection{
slice: nil,
err: fmt.Errorf("gollection.FlatMap called with non-slice value of type %T", g.slice),
}
sv, err := g.validateSlice("FlatMap")
if err != nil {
return &gollection{err: err}
}

currentType := reflect.TypeOf(g.slice).Elem()
if currentType.Kind() != reflect.Slice {
return &gollection{
slice: nil,
err: fmt.Errorf("gollection.FlatMap called with non-slice-of-slice value of type %T", g.slice),
}
if _, err := g.validateSliceOfSlice("FlatMap"); err != nil {
return &gollection{err: err}
}

funcValue := reflect.ValueOf(f)
funcType := funcValue.Type()
if funcType.Kind() != reflect.Func || funcType.NumIn() != 1 || funcType.NumOut() != 1 {
return &gollection{
slice: nil,
err: fmt.Errorf("gollection.FlatMap called with invalid func. required func(in <T>) out <T> but supplied %v", g.slice),
}
funcValue, funcType, err := g.validateFlatMapFunc(f)
if err != nil {
return &gollection{err: err}
}

resultSliceType := reflect.SliceOf(funcType.Out(0))
Expand All @@ -58,7 +47,7 @@ func (g *gollection) flatMap(f interface{}) *gollection {
v := sv.Index(i).Interface()
svv := reflect.ValueOf(v)
for j := 0; j < svv.Len(); j++ {
v := funcValue.Call([]reflect.Value{svv.Index(j)})[0]
v := processMapFunc(funcValue, svv.Index(j))
ret = reflect.Append(ret, v)
}
}
Expand All @@ -74,13 +63,9 @@ func (g *gollection) flatMapStream(f interface{}) *gollection {
ch: make(chan interface{}),
}

funcValue := reflect.ValueOf(f)
funcType := funcValue.Type()
if funcType.Kind() != reflect.Func || funcType.NumIn() != 1 || funcType.NumOut() != 1 {
return &gollection{
slice: nil,
err: fmt.Errorf("gollection.FlatMap called with invalid func. required func(in <T>) out <T> but supplied %v", g.slice),
}
funcValue, funcType, err := g.validateFlatMapFunc(f)
if err != nil {
return &gollection{err: err}
}

var initialized bool
Expand All @@ -102,7 +87,7 @@ func (g *gollection) flatMapStream(f interface{}) *gollection {

svv := reflect.ValueOf(v)
for j := 0; j < svv.Len(); j++ {
v := funcValue.Call([]reflect.Value{svv.Index(j)})[0]
v := processMapFunc(funcValue, svv.Index(j))
next.ch <- v.Interface()
}
} else {
Expand Down
18 changes: 6 additions & 12 deletions flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,14 @@ func (g *gollection) Flatten() *gollection {
}

func (g *gollection) flatten() *gollection {
sv := reflect.ValueOf(g.slice)
if sv.Kind() != reflect.Slice {
return &gollection{
slice: nil,
err: fmt.Errorf("gollection.Flatten called with non-slice value of type %T", g.slice),
}
sv, err := g.validateSlice("Flatten")
if err != nil {
return &gollection{err: err}
}

currentType := reflect.TypeOf(g.slice).Elem()
if currentType.Kind() != reflect.Slice {
return &gollection{
slice: nil,
err: fmt.Errorf("gollection.Flatten called with non-slice-of-slice value of type %T", g.slice),
}
currentType, err := g.validateSliceOfSlice("Flatten")
if err != nil {
return &gollection{err: err}
}

// init
Expand Down
Loading

0 comments on commit f69e3f7

Please sign in to comment.