-
Notifications
You must be signed in to change notification settings - Fork 336
/
flatmap_test.go
111 lines (90 loc) · 3.12 KB
/
flatmap_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package rxgo
import (
"testing"
"github.com/stretchr/testify/mock"
)
func TestFlatMapCompletesWhenSequenceIsEmpty(t *testing.T) {
// given
emissionObserver := NewObserverMock()
// and empty sequence
sequence := Empty()
// and flattens the sequence with identity
sequence = sequence.FlatMap(identity, 1)
// when subscribes to the sequence
sequence.Subscribe(emissionObserver.Capture()).Block()
// then completes without any emission
emissionObserver.AssertNotCalled(t, "OnNext", mock.Anything)
emissionObserver.AssertNotCalled(t, "OnError", mock.Anything)
emissionObserver.AssertCalled(t, "OnDone")
}
func TestFlatMapReturnsSameElementBecauseIdentifyApplied(t *testing.T) {
// given
emissionObserver := NewObserverMock()
// and sequence containing one element
element := 1
sequence := Just(element)
// and flattens the sequence with identity
sequence = sequence.FlatMap(identity, 1)
// when subscribes to the sequence
sequence.Subscribe(emissionObserver.Capture()).Block()
// then completes with emission of the same element
emissionObserver.AssertNotCalled(t, "OnError", mock.Anything)
emissionObserver.AssertCalled(t, "OnNext", element)
emissionObserver.AssertCalled(t, "OnDone")
}
func TestFlatMapReturnsSliceElements(t *testing.T) {
// given
emissionObserver := NewObserverMock()
// and sequence containing slice with few elements
element1 := "element1"
element2 := "element2"
element3 := "element3"
slice := &([]string{element1, element2, element3})
sequence := Just(slice)
// and flattens the sequence with identity
sequence = sequence.FlatMap(flattenThreeElementSlice, 1)
// when subscribes to the sequence
sequence.Subscribe(emissionObserver.Capture()).Block()
// then completes with emission of flatten elements
emissionObserver.AssertNotCalled(t, "OnError", mock.Anything)
emissionObserver.AssertNotCalled(t, "OnNext", slice)
emissionObserver.AssertCalled(t, "OnNext", element1)
emissionObserver.AssertCalled(t, "OnNext", element2)
emissionObserver.AssertCalled(t, "OnNext", element3)
emissionObserver.AssertCalled(t, "OnDone")
}
// TODO To be reimplemented
//func TestFlatMapUsesForParallelProcessingAtLeast1Process(t *testing.T) {
// // given
// emissionObserver := observer.NewObserverMock()
//
// // and
// var maxInParallel uint = 0
//
// // and
// var requestedMaxInParallel uint = 0
// flatteningFuncMock := func(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) {
// requestedMaxInParallel = maxInParallel
// flatObservedSequence(out, o, apply, maxInParallel)
// }
//
// // and flattens the sequence with identity
// sequence := someSequence.FlatMap(identity, maxInParallel, flatteningFuncMock)
//
// // when subscribes to the sequence
// <-sequence.Subscribe(emissionObserver.Capture())
//
// // then completes with emission of the same element
// assert.Equal(t, uint(1), requestedMaxInParallel)
//}
var (
someElement = "some element"
someSequence = Just(someElement)
)
func identity(el interface{}) Observable {
return Just(el)
}
func flattenThreeElementSlice(el interface{}) Observable {
slice := *(el.(*[]string))
return Just(slice[0], slice[1], slice[2])
}