Skip to content

Commit f278c06

Browse files
committed
Start to add end-of-round punctuations.
1 parent ee64c4c commit f278c06

31 files changed

+262
-92
lines changed

compiler/src/test/resources/hand-generated-outputs/shortest-paths-rxflow.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ function Bloom () {
7979
function tickStratum0() {
8080
var tuplesFlushed = 0;
8181
tuplesFlushed += elements[2].flush();
82-
tuplesFlushed += elements[4].flush();
8382
return tuplesFlushed;
8483
}
8584

compiler/src/test/resources/hand-generated-outputs/transitive-closure-rxflow.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ function Bloom () {
1212
this.link = inputs["link"];
1313

1414
var outputs = {
15-
"pathOut": new rx.Subject() /* output pathOut, [from: string, to: string, nxt: string, cost: int] */
15+
"pathOut": new rxflow.ObservableSink() /* output pathOut, [from: string, to: string, nxt: string, cost: int] */
1616
};
17-
this.pathOut = outputs["pathOut"];
17+
this.pathOut = outputs["pathOut"].output;
1818

1919
var elements = {
2020
6: new rxflow.HashJoin(
@@ -48,7 +48,7 @@ function Bloom () {
4848
elements[2].output.subscribe(elements[0].input);
4949
elements[4].output.subscribe(elements[5].rightInput);
5050
elements[4].output.subscribe(elements[6].rightInput);
51-
elements[4].output.subscribe(outputs["pathOut"]);
51+
elements[4].output.subscribe(outputs["pathOut"].input);
5252

5353
// Join outputs:
5454
elements[5].output.subscribe(elements[7].input);
@@ -61,7 +61,6 @@ function Bloom () {
6161
function tickInternal() {
6262
var tuplesFlushed = 0;
6363
tuplesFlushed += elements[2].flush();
64-
tuplesFlushed += elements[4].flush();
6564
return tuplesFlushed;
6665
}
6766

rxflow/benchmark/AggregateBenchmark.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,10 @@ module.exports = {
6060
var group = function(data) { return data[column]; };
6161
var aggregate = new global.rxflow.Aggregate(group, [Count]);
6262
var data = global.Rx.Observable.fromArray(testData);
63+
var observableScanner = new global.rxflow.ObservableScanner(data);
6364
var sink = new global.rxflow.ObservableSink();
6465
aggregate.output.subscribe(sink.input);
65-
data.subscribe(aggregate.input);
66+
observableScanner.output.subscribe(aggregate.input);
6667
var results = [];
6768
sink.output.forEach(function (x) { results.push(x); });
6869
aggregate.flush();

rxflow/src/Aggregate.js

Lines changed: 2 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rxflow/src/Aggregate.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
/// <reference path="../typings/rx.js/rx.d.ts" />
2-
3-
import Rx = require('rx');
41
import DataflowElement = require('./DataflowElement');
52
import InputPort = require('./InputPort');
63
import OutputPort = require('./OutputPort');
@@ -64,9 +61,9 @@ class Aggregate<T> extends DataflowElement {
6461
/**
6562
* An input stream of elements to be aggregated.
6663
*/
67-
input = new InputPort(x => this.updateAggs(x));
64+
input = new InputPort(x => this.updateAggs(x), this);
6865

69-
output = new OutputPort();
66+
output = new OutputPort(this);
7067

7168
flush() {
7269
for (var i = 0; i < this.aggregators.length; ++i) {

rxflow/src/ArgMin.js

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rxflow/src/ArgMin.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class ArgMin<T> extends DataflowElement {
88

99
private aggregate: Aggregate<T>;
1010
input: InputPort<T>;
11-
output = new OutputPort();
11+
output = new OutputPort(this);
1212

1313
constructor(keyFunction, orderingFields, orderingFunction) {
1414
super();
@@ -23,7 +23,7 @@ class ArgMin<T> extends DataflowElement {
2323
};
2424
this.aggregate = new Aggregate(keyFunction, [aggregateFunction]);
2525
this.input = this.aggregate.input;
26-
var outputProjector = new InputPort(x => this.output.onNext(x[1]));
26+
var outputProjector = new InputPort(x => this.output.onNext(x[1]), null);
2727
this.aggregate.output.subscribe(outputProjector);
2828
}
2929

rxflow/src/Buffer.js

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rxflow/src/Buffer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ class Buffer<T> extends DataflowElement {
66

77
private buffer: Array<T> = [];
88

9-
input = new InputPort<T>(x => this.buffer.push(x));
10-
output = new OutputPort<T>();
9+
input = new InputPort<T>(x => this.buffer.push(x), this);
10+
output = new OutputPort<T>(this);
1111

1212

1313
invalidate() {

rxflow/src/DataflowElement.js

Lines changed: 30 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)