Skip to content

Commit 1eede7c

Browse files
committed
Implement shortest-paths by hand in RxFlow.
1 parent d1bd675 commit 1eede7c

File tree

4 files changed

+142
-2
lines changed

4 files changed

+142
-2
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
function Bloom () {
2+
var rx = require('rx');
3+
var rxflow = require('rxflow');
4+
5+
var tables = {
6+
"path": new rxflow.Table(3) /* table path, [from: string, to: string, nxt: string, cost: int] */,
7+
"shortest": new rxflow.Table(3) /* table path, [from: string, to: string, nxt: string, cost: int] */
8+
};
9+
10+
var inputs = {
11+
"link": new rx.Subject() /* input link, [from: string, to: string, cost: int] */
12+
};
13+
this.link = inputs["link"];
14+
15+
var outputs = {
16+
"pathOut": new rx.Subject() /* output pathOut, [from: string, to: string, nxt: string, cost: int] */
17+
};
18+
this.pathOut = outputs["pathOut"];
19+
20+
var elements = {
21+
6: new rxflow.HashJoin(
22+
function(x) { return x[1]; /* link.to */ },
23+
function(x) { return x[0]; /* path.from */ },
24+
"right"
25+
),
26+
5: new rxflow.HashJoin(
27+
function(x) { return x[1]; /* link.to */ },
28+
function(x) { return x[0]; /* path.from */ },
29+
"left"
30+
),
31+
0: new rxflow.Map(
32+
function(x) { return [x[0], x[1], x[1], x[2]]; /* [l.from, l.to, l.to, l.cost] */ }
33+
),
34+
2: new rxflow.ObservableScanner(inputs["link"]),
35+
7: new rxflow.Map(
36+
function(x) { return [x[0][0], x[1][1], x[0][1], x[0][2] + x[1][3]]; /* [l.from, p.to, l.to, l.cost + p.cost] */ }
37+
),
38+
4: new rxflow.TableScanner(tables["path"]),
39+
9: new rxflow.TableScanner(tables["shortest"]),
40+
8: new rxflow.ArgMin(
41+
function(x) { return [x[0], x[1]]; /* [path.from, path.to] */ },
42+
function(x) { return x[3]; /* path.cost */ },
43+
function(x, y) { return x <= y; }
44+
)
45+
};
46+
47+
var invalidationLookupTable = { "link": [2], "path": [7, 5, 0, 6] };
48+
49+
var rescanLookupTable = { "path": [2, 4] };
50+
51+
// Wiring, in a roughly topological order
52+
// Scanner outputs:
53+
elements[2].output.subscribe(elements[5].leftInput);
54+
elements[2].output.subscribe(elements[6].leftInput);
55+
elements[2].output.subscribe(elements[0].input);
56+
elements[4].output.subscribe(elements[5].rightInput);
57+
elements[4].output.subscribe(elements[6].rightInput);
58+
elements[4].output.subscribe(elements[8].input);
59+
elements[8].output.subscribe(outputs["pathOut"]);
60+
61+
// Join outputs:
62+
elements[5].output.subscribe(elements[7].input);
63+
elements[6].output.subscribe(elements[7].input);
64+
65+
// Table insertions:
66+
elements[0].output.subscribe(tables["path"].insert); // Link -> Path map
67+
elements[7].output.subscribe(tables["path"].insert); // Join projection
68+
69+
function tickStratum0() {
70+
var tuplesFlushed = 0;
71+
tuplesFlushed += elements[2].flush();
72+
tuplesFlushed += elements[4].flush();
73+
return tuplesFlushed;
74+
}
75+
76+
function tickStratum1() {
77+
var tuplesFlushed = 0;
78+
tuplesFlushed += elements[8].flush();
79+
return tuplesFlushed;
80+
}
81+
82+
this.tick = function() {
83+
var atFixpoint = false;
84+
while (!atFixpoint) {
85+
atFixpoint = tickStratum0() === 0;
86+
}
87+
tickStratum1()
88+
}
89+
}
90+
91+
var bloom = new Bloom();
92+
93+
bloom.pathOut.subscribe(function(x) { console.log(x)});
94+
bloom.link.onNext(['a', 'b', 1]);
95+
bloom.link.onNext(['a', 'b', 4]);
96+
bloom.link.onNext(['b', 'c', 1]);
97+
bloom.link.onNext(['c', 'd', 1]);
98+
bloom.link.onNext(['d', 'e', 1]);
99+
bloom.tick();
100+
console.log('----');
101+
bloom.link.onNext(['e', 'f', 1]);
102+
bloom.tick();

rxflow/src/Aggregate.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ var Rx = require('rx');
1313
function Aggregate(keyFunction, aggregates) {
1414
'use strict';
1515

16+
var _this = this;
1617
var _aggregators = [];
1718
var _groupKeys = [];
1819
var _keyToArrayIndex = {};
@@ -40,6 +41,12 @@ function Aggregate(keyFunction, aggregates) {
4041
*/
4142
this.input = Rx.Observer.create(updateAggs);
4243

44+
this.output = new Rx.Subject();
45+
46+
this.flush = function() {
47+
this.getCurrentValues().subscribe(_this.output);
48+
};
49+
4350
/**
4451
* Return a stream of groups and values as an Rx observable.
4552
*/
@@ -49,15 +56,14 @@ function Aggregate(keyFunction, aggregates) {
4956
for (var i = 0; i < _aggregators.length; ++i) {
5057
observer.onNext([_groupKeys[i]].concat(_aggregators[i].map(extractValue)));
5158
}
52-
observer.onCompleted();
5359
});
5460
};
5561

5662
/**
5763
* Reset this element by resetting aggregates to their initial values
5864
* and clearing all groups.
5965
*/
60-
this.reset = function() {
66+
this.invalidate = function() {
6167
_aggregators = [];
6268
_groupKeys = [];
6369
_keyToArrayIndex = {};

rxflow/src/ArgMin.js

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
var Rx = require('rx');
2+
var Aggregate = require('./Aggregate');
3+
4+
5+
function ArgMin(keyFunction, orderingFields, orderingFunction) {
6+
'use strict';
7+
8+
var _this = this;
9+
10+
var aggregateFunction = function () {
11+
var value = null;
12+
this.getValue = function() { return value; };
13+
this.next = function(x) {
14+
if (value === null || orderingFunction(orderingFields(x), orderingFields(value))) {
15+
value = x;
16+
}
17+
};
18+
};
19+
var aggregate = new Aggregate(keyFunction, [aggregateFunction]);
20+
21+
this.input = aggregate.input;
22+
this.output = new Rx.Subject();
23+
24+
this.flush = function () {
25+
aggregate.getCurrentValues().map(function(x) { return x[1]; }).subscribe(_this.output);
26+
};
27+
28+
this.invalidate = aggregate.invalidate;
29+
30+
}
31+
module.exports = ArgMin;

rxflow/src/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
var rxflow = {};
22
rxflow.Aggregate = require('./Aggregate');
3+
rxflow.ArgMin = require('./ArgMin');
34
rxflow.Buffer = require('./Buffer');
45
rxflow.HashJoin = require('./HashJoin');
56
rxflow.Map = require('./Map');

0 commit comments

Comments
 (0)