Skip to content

Commit 1f96c87

Browse files
committed
implememented Distinct
1 parent 35109fe commit 1f96c87

File tree

4 files changed

+62
-26
lines changed

4 files changed

+62
-26
lines changed

streams/global.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -112,26 +112,6 @@ func NewCollectionFromMap(m interface{}) (ICollection, error) {
112112
}).init(), nil
113113
}
114114

115-
//func NewCollectionFromMap(m interface{}) (ICollection, error) {
116-
// // Ideally, a collection implementation from a Map would have been defined that knows how to iterate over a K,V set to avoid iterating over the full map initially generate
117-
// // an array of `*KeyValuePair`. However, there is not way to iterate over a K,V set of a map through reflection, instead the only thing available is the function `MapKeys`
118-
// // which
119-
// val := reflect.ValueOf(m)
120-
//
121-
// if val.Kind() != reflect.Map {
122-
// return nil, fmt.Errorf("unable to create a key value set collection, the input value must be a map, %s", val.Kind().String())
123-
// }
124-
//
125-
// var array []*KeyValuePair
126-
// for _, key := range val.MapKeys() {
127-
// array = append(array, &KeyValuePair{
128-
// Key: key.Interface(),
129-
// Value: val.MapIndex(key),
130-
// })
131-
// }
132-
//
133-
// return NewCollectionFromArray(array)
134-
//}
135115
// NewArrayCollection Creates a new empty array collection of the given type
136116
//
137117
// - elementType: The element type for the items in the collection to be created.

streams/stream.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ type IStream interface {
3535
// best combine it with a `SortBy`. Only needs to be provided once per stream.
3636
Map(f ConvertFunc, threads ...int) IStream
3737

38+
// Distinct Returns a stream consisting of the distinct elements
39+
Distinct() IStream
40+
3841
// First Returns the first element of the resulting stream.
3942
// Returns nil (or default value if provided) if the resulting stream is empty.
4043
First(defaultValue ...interface{}) interface{}

streams/stream_impl.go

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type Stream struct {
1313
iterable IIterable
1414
filters []ConditionalFunc
1515
sorts []sortFunc
16+
distinct bool
1617
threads int
1718
}
1819

@@ -26,6 +27,19 @@ type sorter struct {
2627
sorts []sortFunc
2728
}
2829

30+
type iAdd interface {
31+
Add(item interface{}) error
32+
}
33+
34+
type mapAdd struct {
35+
m reflect.Value
36+
}
37+
38+
func (m *mapAdd) Add(item interface{}) error {
39+
m.m.SetMapIndex(reflect.ValueOf(item), reflect.ValueOf(true))
40+
return nil
41+
}
42+
2943
// SetThreads Sets the amount of go channels to be used for parallel filtering to a maximum of the available CPUs in the
3044
// host machine. Providing a value <= 0, indicates the maximum amount of available CPUs will be the number that determines
3145
// the amount of go channels to be used. If order matters, best combine it with a `SortBy`. Only needs to be provided once
@@ -91,6 +105,12 @@ func (s *Stream) Map(f ConvertFunc, threads ...int) IStream {
91105
return FromIterable(col)
92106
}
93107

108+
// Distinct Returns a stream consisting of the distinct elements
109+
func (s *Stream) Distinct() IStream {
110+
s.distinct = true
111+
return s
112+
}
113+
94114
// First Returns the first element of the resulting stream.
95115
// Returns nil (or default value if provided) if the resulting stream is empty.
96116
//
@@ -347,17 +367,25 @@ func (s *Stream) parallelProcess(threads int) IIterable {
347367
}
348368

349369
func (s *Stream) filter(iterable IIterable) IIterable {
350-
return s.filterHandler(iterable, 0, iterable.Len())
370+
return s.iterHandler(iterable, 0, iterable.Len())
351371
}
352372

353-
func (s *Stream) filterHandler(iterable IIterable, start, end int) IIterable {
354-
if len(s.filters) == 0 {
373+
func (s *Stream) iterHandler(iterable IIterable, start, end int) IIterable {
374+
if len(s.filters) == 0 && !s.distinct {
355375
return iterable
356376
}
357377

378+
var adder iAdd
379+
358380
ret := NewArrayCollection(iterable.ElementType())
359381
iterator := iterable.Iterator().Skip(start)
360382
i := start
383+
adder = ret
384+
385+
if s.distinct {
386+
mType := reflect.MapOf(iterable.ElementType(), reflect.TypeOf(true))
387+
adder = &mapAdd{m: reflect.MakeMap(mType)}
388+
}
361389

362390
for x := iterator.Current(); iterator.HasNext() && i < end; x = iterator.Next() {
363391
i++
@@ -372,7 +400,14 @@ func (s *Stream) filterHandler(iterable IIterable, start, end int) IIterable {
372400
}
373401

374402
if match {
375-
ret.Add(x)
403+
_ = adder.Add(x)
404+
}
405+
}
406+
407+
if s.distinct {
408+
mAdder := adder.(*mapAdd)
409+
for _, v := range mAdder.m.MapKeys() {
410+
_ = ret.Add(v.Interface())
376411
}
377412
}
378413

@@ -381,7 +416,7 @@ func (s *Stream) filterHandler(iterable IIterable, start, end int) IIterable {
381416

382417
func (s *Stream) parallelProcessHandler(iterable IIterable, threads int) IIterable {
383418
worker := func(result chan IIterable, start, end int) {
384-
result <- s.filterHandler(iterable, start, end)
419+
result <- s.iterHandler(iterable, start, end)
385420
}
386421

387422
ret := NewArrayCollection(iterable.ElementType())
@@ -482,7 +517,6 @@ func getCores(threads ...int) int {
482517
// TODO:
483518
//
484519
// STREAM
485-
// Distinct
486520
// Reverse
487521

488522
// OPTIONAL ?? or element

streams/stream_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,3 +259,22 @@ func TestStream_MapCollection(t *testing.T) {
259259
jsonResult, _ := json.MarshalIndent(result, "", " ")
260260
assert.JSONEq(t, `[{ "Key": "e", "Value": "111" }, { "Key": "a", "Value": "123" }, { "Key": "b", "Value": "456" }, { "Key": "c", "Value": "789" }]`, string(jsonResult))
261261
}
262+
263+
func TestStream_Distinct(t *testing.T) {
264+
arr := append(testArray, "apple", "banana", "kiwi", "apple", "banana", "apple")
265+
result := From(arr).Distinct().ToArray().([]string)
266+
267+
assert.Equal(t, len(result), len(testArray))
268+
}
269+
270+
func TestStream_DistinctWithSort(t *testing.T) {
271+
arr := append(testArray, "apple", "banana", "kiwi", "apple", "banana", "apple")
272+
sortFn := func(a interface{}, b interface{}) int {
273+
return strings.Compare(a.(string), b.(string))
274+
}
275+
276+
expected := []string{"apple", "banana", "kiwi", "orange", "peach", "pear", "pineapple", "plum"}
277+
sorted := From(arr).OrderBy(sortFn).Distinct().ToArray().([]string)
278+
279+
assert.Equal(t, expected, sorted)
280+
}

0 commit comments

Comments
 (0)