Skip to content

Commit 1432e59

Browse files
justinwoobenlesh
authored andcommitted
feat(Observable): add pairwise operator
bring in pairwise operator from RxJS4
1 parent 69ebd9a commit 1432e59

File tree

6 files changed

+155
-1
lines changed

6 files changed

+155
-1
lines changed

doc/index.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
- [mergeAll](function/index.html#static-function-mergeAll)
5858
- [multicast](function/index.html#static-function-multicast)
5959
- [observeOn](function/index.html#static-function-observeOn)
60+
- [pairwise](function/index.html#static-function-pairwise)
6061
- [partition](function/index.html#static-function-partition)
6162
- [publish](function/index.html#static-function-publish)
6263
- [publishBehavior](function/index.html#static-function-publishBehavior)
@@ -89,4 +90,4 @@
8990
- [windowWhen](function/index.html#static-function-windowWhen)
9091
- [withLatestFrom](function/index.html#static-function-withLatestFrom)
9192
- [zip](function/index.html#static-function-zip)
92-
- [zipAll](function/index.html#static-function-zipAll)
93+
- [zipAll](function/index.html#static-function-zipAll)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
var RxOld = require('rx');
2+
var RxNew = require('../../../../index');
3+
4+
module.exports = function (suite) {
5+
var oldPairwiseWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).pairwise();
6+
var newPairwiseWithImmediateScheduler = RxNew.Observable.range(0, 25).pairwise();
7+
8+
function _next(x) { }
9+
function _error(e) { }
10+
function _complete() { }
11+
return suite
12+
.add('old pairwise with immediate scheduler', function () {
13+
oldPairwiseWithImmediateScheduler.subscribe(_next, _error, _complete);
14+
})
15+
.add('new pairwise with immediate scheduler', function () {
16+
newPairwiseWithImmediateScheduler.subscribe(_next, _error, _complete);
17+
});
18+
};

spec/operators/pairwise-spec.js

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot */
2+
var Rx = require('../../dist/cjs/Rx');
3+
var Observable = Rx.Observable;
4+
5+
describe('Observable.prototype.pairwise()', function () {
6+
it('should pairwise things', function () {
7+
var e1 = hot('--a--^--b--c--d--e--f--g--|');
8+
var e1subs = '^ !';
9+
var expected = '------v--w--x--y--z--|';
10+
11+
var values = {
12+
v: ['b', 'c'],
13+
w: ['c', 'd'],
14+
x: ['d', 'e'],
15+
y: ['e', 'f'],
16+
z: ['f', 'g']
17+
};
18+
19+
var source = e1.pairwise();
20+
21+
expectObservable(source).toBe(expected, values);
22+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
23+
});
24+
25+
it('should not emit on single-element streams', function () {
26+
var e1 = hot('-----^--b----|');
27+
var e1subs = '^ !';
28+
var expected = '--------|';
29+
30+
var values = {
31+
};
32+
33+
var source = e1.pairwise();
34+
35+
expectObservable(source).toBe(expected, values);
36+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
37+
});
38+
39+
it('should handle mid-stream throw', function () {
40+
var e1 = hot('--a--^--b--c--d--e--#');
41+
var e1subs = '^ !';
42+
var expected = '------v--w--x--#';
43+
44+
var values = {
45+
v: ['b', 'c'],
46+
w: ['c', 'd'],
47+
x: ['d', 'e']
48+
};
49+
50+
var source = e1.pairwise();
51+
52+
expectObservable(source).toBe(expected, values);
53+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
54+
});
55+
56+
it('should handle empty', function () {
57+
var e1 = cold('|');
58+
var e1subs = '(^!)';
59+
var expected = '|';
60+
61+
var source = e1.pairwise();
62+
63+
expectObservable(source).toBe(expected);
64+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
65+
});
66+
67+
it('should handle never', function () {
68+
var e1 = cold('-');
69+
var e1subs = '^';
70+
var expected = '-';
71+
72+
var source = e1.pairwise();
73+
74+
expectObservable(source).toBe(expected);
75+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
76+
});
77+
78+
it('should handle throw', function () {
79+
var e1 = cold('#');
80+
var e1subs = '(^!)';
81+
var expected = '#';
82+
83+
var source = e1.pairwise();
84+
85+
expectObservable(source).toBe(expected);
86+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
87+
});
88+
});

src/Rx.KitchenSink.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export interface KitchenSinkOperators<T> extends CoreOperators<T> {
1313
findIndex?: (predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any) => Observable<number>;
1414
max?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
1515
min?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
16+
pairwise?: <R>() => Observable<R>;
1617
timeInterval?: <T>(scheduler?: IScheduler) => Observable<T>;
1718
mergeScan?: <T, R>(project: (acc: R, x: T) => Observable<R>, seed: R, concurrent?: number) => Observable<R>;
1819
exhaust?: () => Observable<T>;
@@ -89,6 +90,7 @@ import './add/operator/mergeScan';
8990
import './add/operator/min';
9091
import './add/operator/multicast';
9192
import './add/operator/observeOn';
93+
import './add/operator/pairwise';
9294
import './add/operator/partition';
9395
import './add/operator/publish';
9496
import './add/operator/publishBehavior';

src/add/operator/pairwise.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import {Observable} from '../../Observable';
2+
import {pairwise} from '../../operator/pairwise';
3+
import {KitchenSinkOperators} from '../../Rx.KitchenSink';
4+
const observableProto = (<KitchenSinkOperators<any>>Observable.prototype);
5+
observableProto.pairwise = pairwise;
6+
7+
export var _void: void;

src/operator/pairwise.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import {Operator} from '../Operator';
2+
import {Observable} from '../Observable';
3+
import {Subscriber} from '../Subscriber';
4+
5+
/**
6+
* Returns a new observable that triggers on the second and following inputs.
7+
* An input that triggers an event will return an pair of [(N - 1)th, Nth].
8+
* The (N-1)th is stored in the internal state until Nth input occurs.
9+
* @returns {Observable<R>} an observable of pairs of values.
10+
*/
11+
export function pairwise<T>(): Observable<T> {
12+
return this.lift(new PairwiseOperator());
13+
}
14+
15+
class PairwiseOperator<T, R> implements Operator<T, R> {
16+
call(subscriber: Subscriber<T>): Subscriber<T> {
17+
return new PairwiseSubscriber(subscriber);
18+
}
19+
}
20+
21+
class PairwiseSubscriber<T> extends Subscriber<T> {
22+
private prev: T;
23+
private hasPrev: boolean = false;
24+
25+
constructor(destination: Subscriber<T>) {
26+
super(destination);
27+
}
28+
29+
_next(value: T): void {
30+
if (this.hasPrev) {
31+
this.destination.next([this.prev, value]);
32+
} else {
33+
this.hasPrev = true;
34+
}
35+
36+
this.prev = value;
37+
}
38+
}

0 commit comments

Comments
 (0)