Skip to content

Commit 22bcdb1

Browse files
committed
Port the rest of rxflow to TypeScript.
1 parent cd871ab commit 22bcdb1

File tree

11 files changed

+388
-140
lines changed

11 files changed

+388
-140
lines changed

rxflow/src/Buffer.js

Lines changed: 37 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rxflow/src/Buffer.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import DataflowElement = require('./DataflowElement');
2+
import InputPort = require('./InputPort');
3+
import OutputPort = require('./OutputPort');
4+
5+
class Buffer<T> extends DataflowElement {
6+
7+
private buffer: Array<T> = [];
8+
9+
input = new InputPort<T>(x => this.buffer.push(x));
10+
output = new OutputPort<T>();
11+
12+
13+
invalidate() {
14+
this.buffer = [];
15+
}
16+
17+
isEmpty(): boolean {
18+
return this.buffer.length === 0;
19+
}
20+
21+
flush() {
22+
var oldBuffer = this.buffer;
23+
this.buffer = [];
24+
oldBuffer.forEach(x => this.output.onNext(x));
25+
return oldBuffer.length;
26+
}
27+
}
28+
29+
export = Buffer;

rxflow/src/HashJoin.js

Lines changed: 66 additions & 46 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rxflow/src/HashJoin.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import DataflowElement = require('./DataflowElement');
2+
import InputPort = require('./InputPort');
3+
import OutputPort = require('./OutputPort');
4+
5+
6+
class HashJoin extends DataflowElement {
7+
8+
leftInput: InputPort<any>;
9+
rightInput: InputPort<any>;
10+
output = new OutputPort();
11+
12+
private buildInput: InputPort<any>;
13+
private probeInput: InputPort<any>;
14+
15+
private hashTable = {};
16+
17+
private buildKeyFunc;
18+
private probeKeyFunc;
19+
private resultOrderingFunction;
20+
21+
constructor(leftKeyFunc, rightKeyFunc, buildInput) {
22+
super();
23+
if (buildInput === 'left') {
24+
this.leftInput = new InputPort(x => this.handleBuildInput(x));
25+
this.rightInput = new InputPort(x => this.handleProbeInput(x));
26+
this.buildInput = this.leftInput;
27+
this.probeInput = this.rightInput;
28+
this.buildKeyFunc = leftKeyFunc;
29+
this.probeKeyFunc = rightKeyFunc;
30+
this.resultOrderingFunction = function (b, p) { return [b, p]; };
31+
} else if (buildInput === 'right') {
32+
this.rightInput = new InputPort(x => this.handleBuildInput(x));
33+
this.leftInput = new InputPort(x => this.handleProbeInput(x));
34+
this.buildInput = this.rightInput;
35+
this.probeInput = this.leftInput;
36+
this.buildKeyFunc = rightKeyFunc;
37+
this.probeKeyFunc = leftKeyFunc;
38+
this.resultOrderingFunction = function (b, p) { return [p, b]; };
39+
} else {
40+
throw new Error('buildInput should be \'left\' or \'right\', not \'' + buildInput + '\'');
41+
}
42+
}
43+
44+
45+
private handleProbeInput(p) {
46+
var key = this.probeKeyFunc(p);
47+
if (key in this.hashTable) {
48+
var matches = this.hashTable[key];
49+
matches.forEach(b => this.output.onNext(this.resultOrderingFunction(b, p)));
50+
}
51+
}
52+
53+
private handleBuildInput(b) {
54+
var key = this.buildKeyFunc(b);
55+
if (!(key in this.hashTable)) {
56+
this.hashTable[key] = [b];
57+
} else {
58+
this.hashTable[key].push(b);
59+
}
60+
}
61+
62+
}
63+
64+
export = HashJoin;

rxflow/src/ObservableScanner.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
var Buffer = require('./buffer');
1+
var Buffer = require('./Buffer');
22

33

44
function ObservableScanner(observable) {

rxflow/src/Table.js

Lines changed: 42 additions & 41 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)