Skip to content

Commit

Permalink
Replaced Unix with UnixNano in Join function.
Browse files Browse the repository at this point in the history
Changed condition to remove old items from right stream buffer.
Simplified window check condition.
  • Loading branch information
v-zubko committed Feb 26, 2020
1 parent bf1c2d2 commit 4ff7ff1
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 14 deletions.
24 changes: 15 additions & 9 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,12 @@ func (op *ignoreElementsOperator) end(_ context.Context, _ chan<- Item) {
func (op *ignoreElementsOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) {
}

// Returns absolute value for int64
func abs(n int64) int64 {
y := n >> 63
return (n ^ y) - y
}

// Join combines items emitted by two Observables whenever an item from one Observable is emitted during
// a time window defined according to an item emitted by the other Observable.
// The time is extracted using a timeExtractor function.
Expand All @@ -1134,6 +1140,7 @@ func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func

lObserve := o.Observe()
rObserve := right.Observe()
lLoop:
for {
select {
case <-ctx.Done():
Expand All @@ -1149,11 +1156,11 @@ func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func
}
continue
}
lTime := timeExtractor(lItem.V).Unix()
lTime := timeExtractor(lItem.V).UnixNano()
cutPoint := 0
for i, rItem := range rBuf {
rTime := timeExtractor(rItem.V).Unix()
if (lTime <= rTime && rTime <= lTime+windowDuration) || (rTime <= lTime && lTime <= rTime+windowDuration) {
rTime := timeExtractor(rItem.V).UnixNano()
if abs(lTime-rTime) <= windowDuration {
i, err := joiner(ctx, lItem.V, rItem.V)
if err != nil {
Error(err).SendContext(ctx, next)
Expand All @@ -1164,21 +1171,20 @@ func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func
}
Of(i).SendContext(ctx, next)
}
if lTime >= rTime {
if lTime > rTime+windowDuration {
cutPoint = i + 1
}
}

rBuf = rBuf[cutPoint:]

rLoop:
for {
select {
case <-ctx.Done():
return
case rItem, ok := <-rObserve:
if rItem.V == nil && !ok {
break rLoop
continue lLoop
}
if rItem.Error() {
rItem.SendContext(ctx, next)
Expand All @@ -1189,8 +1195,8 @@ func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func
}

rBuf = append(rBuf, rItem)
rTime := timeExtractor(rItem.V).Unix()
if (lTime <= rTime && rTime <= lTime+windowDuration) || (rTime <= lTime && lTime <= rTime+windowDuration) {
rTime := timeExtractor(rItem.V).UnixNano()
if abs(lTime-rTime) <= windowDuration {
i, err := joiner(ctx, lItem.V, rItem.V)
if err != nil {
Error(err).SendContext(ctx, next)
Expand All @@ -1203,7 +1209,7 @@ func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func

continue
}
break rLoop
continue lLoop
}
}
}
Expand Down
35 changes: 30 additions & 5 deletions observable_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func joinTest(t *testing.T, left, right []interface{}, window Duration, expected
},
rightObs,
func(i interface{}) time.Time {
return time.Unix(i.(map[string]int64)["tt"], 0)
return time.Unix(0, i.(map[string]int64)["tt"]*1000000)
},
window,
)
Expand All @@ -683,7 +683,7 @@ func Test_Observable_Join1(t *testing.T) {
map[string]int64{"tt": 3, "V": 6},
map[string]int64{"tt": 5, "V": 7},
}
window := WithDuration(2)
window := WithDuration(2 * time.Millisecond)
expected := []int64{
1, 5,
1, 6,
Expand All @@ -708,7 +708,7 @@ func Test_Observable_Join2(t *testing.T) {
map[string]int64{"tt": 7, "V": 2},
map[string]int64{"tt": 10, "V": 3},
}
window := WithDuration(3)
window := WithDuration(2 * time.Millisecond)
expected := []int64{
1, 1,
2, 1,
Expand All @@ -720,6 +720,31 @@ func Test_Observable_Join2(t *testing.T) {
joinTest(t, left, right, window, expected)
}

func Test_Observable_Join3(t *testing.T) {
left := []interface{}{
map[string]int64{"tt": 1, "V": 1},
map[string]int64{"tt": 2, "V": 2},
map[string]int64{"tt": 3, "V": 3},
map[string]int64{"tt": 4, "V": 4},
}
right := []interface{}{
map[string]int64{"tt": 5, "V": 1},
map[string]int64{"tt": 6, "V": 2},
map[string]int64{"tt": 7, "V": 3},
}
window := WithDuration(3 * time.Millisecond)
expected := []int64{
2, 1,
3, 1,
3, 2,
4, 1,
4, 2,
4, 3,
}

joinTest(t, left, right, window, expected)
}

func Test_Observable_Join_Error_OnLeft(t *testing.T) {
left := []interface{}{
map[string]int64{"tt": 1, "V": 1},
Expand All @@ -732,7 +757,7 @@ func Test_Observable_Join_Error_OnLeft(t *testing.T) {
map[string]int64{"tt": 7, "V": 2},
map[string]int64{"tt": 10, "V": 3},
}
window := WithDuration(3)
window := WithDuration(3 * time.Millisecond)
expected := []int64{
1, 1,
2, 1,
Expand All @@ -753,7 +778,7 @@ func Test_Observable_Join_Error_OnRight(t *testing.T) {
errFoo,
map[string]int64{"tt": 10, "V": 3},
}
window := WithDuration(3)
window := WithDuration(3 * time.Millisecond)
expected := []int64{
1, 1,
}
Expand Down

0 comments on commit 4ff7ff1

Please sign in to comment.