Skip to content

Commit

Permalink
Merge pull request #232 from v-zubko/join-update
Browse files Browse the repository at this point in the history
Join function update
  • Loading branch information
teivah authored Feb 26, 2020
2 parents bf1c2d2 + 4ff7ff1 commit 42278d5
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 14 deletions.
24 changes: 15 additions & 9 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,12 @@ func (op *ignoreElementsOperator) end(_ context.Context, _ chan<- Item) {
func (op *ignoreElementsOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) {
}

// Returns absolute value for int64
func abs(n int64) int64 {
y := n >> 63
return (n ^ y) - y
}

// Join combines items emitted by two Observables whenever an item from one Observable is emitted during
// a time window defined according to an item emitted by the other Observable.
// The time is extracted using a timeExtractor function.
Expand All @@ -1134,6 +1140,7 @@ func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func

lObserve := o.Observe()
rObserve := right.Observe()
lLoop:
for {
select {
case <-ctx.Done():
Expand All @@ -1149,11 +1156,11 @@ func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func
}
continue
}
lTime := timeExtractor(lItem.V).Unix()
lTime := timeExtractor(lItem.V).UnixNano()
cutPoint := 0
for i, rItem := range rBuf {
rTime := timeExtractor(rItem.V).Unix()
if (lTime <= rTime && rTime <= lTime+windowDuration) || (rTime <= lTime && lTime <= rTime+windowDuration) {
rTime := timeExtractor(rItem.V).UnixNano()
if abs(lTime-rTime) <= windowDuration {
i, err := joiner(ctx, lItem.V, rItem.V)
if err != nil {
Error(err).SendContext(ctx, next)
Expand All @@ -1164,21 +1171,20 @@ func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func
}
Of(i).SendContext(ctx, next)
}
if lTime >= rTime {
if lTime > rTime+windowDuration {
cutPoint = i + 1
}
}

rBuf = rBuf[cutPoint:]

rLoop:
for {
select {
case <-ctx.Done():
return
case rItem, ok := <-rObserve:
if rItem.V == nil && !ok {
break rLoop
continue lLoop
}
if rItem.Error() {
rItem.SendContext(ctx, next)
Expand All @@ -1189,8 +1195,8 @@ func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func
}

rBuf = append(rBuf, rItem)
rTime := timeExtractor(rItem.V).Unix()
if (lTime <= rTime && rTime <= lTime+windowDuration) || (rTime <= lTime && lTime <= rTime+windowDuration) {
rTime := timeExtractor(rItem.V).UnixNano()
if abs(lTime-rTime) <= windowDuration {
i, err := joiner(ctx, lItem.V, rItem.V)
if err != nil {
Error(err).SendContext(ctx, next)
Expand All @@ -1203,7 +1209,7 @@ func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func

continue
}
break rLoop
continue lLoop
}
}
}
Expand Down
35 changes: 30 additions & 5 deletions observable_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func joinTest(t *testing.T, left, right []interface{}, window Duration, expected
},
rightObs,
func(i interface{}) time.Time {
return time.Unix(i.(map[string]int64)["tt"], 0)
return time.Unix(0, i.(map[string]int64)["tt"]*1000000)
},
window,
)
Expand All @@ -683,7 +683,7 @@ func Test_Observable_Join1(t *testing.T) {
map[string]int64{"tt": 3, "V": 6},
map[string]int64{"tt": 5, "V": 7},
}
window := WithDuration(2)
window := WithDuration(2 * time.Millisecond)
expected := []int64{
1, 5,
1, 6,
Expand All @@ -708,7 +708,7 @@ func Test_Observable_Join2(t *testing.T) {
map[string]int64{"tt": 7, "V": 2},
map[string]int64{"tt": 10, "V": 3},
}
window := WithDuration(3)
window := WithDuration(2 * time.Millisecond)
expected := []int64{
1, 1,
2, 1,
Expand All @@ -720,6 +720,31 @@ func Test_Observable_Join2(t *testing.T) {
joinTest(t, left, right, window, expected)
}

func Test_Observable_Join3(t *testing.T) {
left := []interface{}{
map[string]int64{"tt": 1, "V": 1},
map[string]int64{"tt": 2, "V": 2},
map[string]int64{"tt": 3, "V": 3},
map[string]int64{"tt": 4, "V": 4},
}
right := []interface{}{
map[string]int64{"tt": 5, "V": 1},
map[string]int64{"tt": 6, "V": 2},
map[string]int64{"tt": 7, "V": 3},
}
window := WithDuration(3 * time.Millisecond)
expected := []int64{
2, 1,
3, 1,
3, 2,
4, 1,
4, 2,
4, 3,
}

joinTest(t, left, right, window, expected)
}

func Test_Observable_Join_Error_OnLeft(t *testing.T) {
left := []interface{}{
map[string]int64{"tt": 1, "V": 1},
Expand All @@ -732,7 +757,7 @@ func Test_Observable_Join_Error_OnLeft(t *testing.T) {
map[string]int64{"tt": 7, "V": 2},
map[string]int64{"tt": 10, "V": 3},
}
window := WithDuration(3)
window := WithDuration(3 * time.Millisecond)
expected := []int64{
1, 1,
2, 1,
Expand All @@ -753,7 +778,7 @@ func Test_Observable_Join_Error_OnRight(t *testing.T) {
errFoo,
map[string]int64{"tt": 10, "V": 3},
}
window := WithDuration(3)
window := WithDuration(3 * time.Millisecond)
expected := []int64{
1, 1,
}
Expand Down

0 comments on commit 42278d5

Please sign in to comment.