Skip to content

Commit

Permalink
add session window tests
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Nov 25, 2021
1 parent e0580d8 commit f0570bc
Showing 1 changed file with 43 additions and 5 deletions.
48 changes: 43 additions & 5 deletions flow/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func ingest(source []string, in chan interface{}) {
}
}

func ingestDeferred(item string, in chan interface{}, wait time.Duration) {
time.Sleep(wait)
in <- item
}

func deferClose(in chan interface{}, d time.Duration) {
time.Sleep(d)
close(in)
Expand All @@ -56,16 +61,16 @@ func TestFlow(t *testing.T) {
flow1 := flow.NewMap(toUpper, 1)
flow2 := flow.NewFlatMap(appendAsterix, 1)
flow3 := flow.NewFlatMap(flatten, 1)
throttler := flow.NewThrottler(10, time.Second*1, 50, flow.Backpressure)
slidingWindow := flow.NewSlidingWindow(time.Second*2, time.Second*2)
tumblingWindow := flow.NewTumblingWindow(time.Second * 1)
throttler := flow.NewThrottler(10, time.Second, 50, flow.Backpressure)
slidingWindow := flow.NewSlidingWindow(2*time.Second, 2*time.Second)
tumblingWindow := flow.NewTumblingWindow(time.Second)
sink := ext.NewChanSink(out)

var _input = []string{"a", "b", "c"}
var _expectedOutput = []string{"A*", "B*", "C*"}

go ingest(_input, in)
go deferClose(in, time.Second*3)
go deferClose(in, 3*time.Second)
go func() {
source.Via(flow1).
Via(tumblingWindow).
Expand All @@ -75,10 +80,12 @@ func TestFlow(t *testing.T) {
Via(throttler).
To(sink)
}()

var _output []string
for e := range sink.Out {
_output = append(_output, e.(string))
}

assertEqual(t, _expectedOutput, _output)
}

Expand All @@ -95,19 +102,49 @@ func TestFlowUtil(t *testing.T) {
var _expectedOutput = []string{"B", "B", "C", "C"}

go ingest(_input, in)
go deferClose(in, time.Second*1)
go deferClose(in, time.Second)
go func() {
fanOut := flow.FanOut(source.Via(filter).Via(flow1), 2)
flow.Merge(fanOut...).To(sink)
}()

var _output []string
for e := range sink.Out {
_output = append(_output, e.(string))
}
sort.Strings(_output)

assertEqual(t, _expectedOutput, _output)
}

func TestSessionWindow(t *testing.T) {
in := make(chan interface{})
out := make(chan interface{})

source := ext.NewChanSource(in)
sessionWindow := flow.NewSessionWindow(200 * time.Millisecond)
sink := ext.NewChanSink(out)

var _input = []string{"a", "b", "c"}
go ingest(_input, in)
go ingestDeferred("d", in, 300*time.Millisecond)
go ingestDeferred("e", in, 700*time.Millisecond)
go deferClose(in, time.Second)
go func() {
source.Via(sessionWindow).To(sink)
}()

var _output [][]interface{}
for e := range sink.Out {
_output = append(_output, e.([]interface{}))
}

assertEqual(t, len(_output), 3)
assertEqual(t, len(_output[0]), 3)
assertEqual(t, len(_output[1]), 1)
assertEqual(t, len(_output[2]), 1)
}

func TestQueue(t *testing.T) {
queue := &flow.PriorityQueue{}
heap.Push(queue, flow.NewItem(1, util.NowNano(), 0))
Expand All @@ -117,6 +154,7 @@ func TestQueue(t *testing.T) {
head := queue.Head()
queue.Update(head, util.NowNano())
first := heap.Pop(queue).(*flow.Item)

assertEqual(t, first.Msg.(int), 2)
}

Expand Down

0 comments on commit f0570bc

Please sign in to comment.