diff --git a/src/behavior.ts b/src/behavior.ts
index 9195dac..f1f561b 100644
--- a/src/behavior.ts
+++ b/src/behavior.ts
@@ -1,5 +1,5 @@
import { cons, DoubleLinkedList, Node, fromArray, nil } from "./datastructures";
-import { combine } from "./index";
+import { combine, isPlaceholder } from "./index";
import { State, Reactive, Time, BListener, Parent, SListener } from "./common";
import { Future, BehaviorFuture } from "./future";
import * as F from "./future";
@@ -69,12 +69,9 @@ export abstract class Behavior extends Reactive
}
abstract update(t: number): A;
pushB(t: number): void {
- if (this.state === State.Push) {
- const newValue = this.update(t);
- this.pulledAt = t;
- if (this.last !== newValue) {
- this.changedAt = t;
- this.last = newValue;
+ if (this.pulledAt !== t) {
+ this.pull(t);
+ if (this.changedAt === t && this.state === State.Push) {
pushToChildren(t, this);
}
}
@@ -85,10 +82,8 @@ export abstract class Behavior extends Reactive
let shouldRefresh = this.changedAt === undefined;
for (const parent of this.parents) {
if (isBehavior(parent)) {
- if (parent.state !== State.Push && parent.pulledAt !== t) {
- parent.pull(t);
- }
- shouldRefresh = shouldRefresh || parent.changedAt > this.changedAt;
+ parent.pull(t);
+ shouldRefresh = shouldRefresh || this.changedAt < parent.changedAt;
}
}
if (shouldRefresh) {
@@ -128,39 +123,44 @@ export function pushToChildren(t: number, b: Behavior): void {
}
}
-function refresh(b: Behavior, t: number): void {
+function refresh(b: Behavior, t: number) {
const newValue = b.update(t);
- if (newValue !== b.last) {
- b.changedAt = t;
- b.last = newValue;
+ if (newValue === b.last) {
+ return;
}
+ b.changedAt = t;
+ b.last = newValue;
}
export function isBehavior(b: any): b is Behavior {
- return typeof b === "object" && "at" in b;
+ return (
+ (typeof b === "object" && "at" in b && !isPlaceholder(b)) ||
+ (isPlaceholder(b) && (b.source === undefined || isBehavior(b.source)))
+ );
}
export abstract class ProducerBehavior extends Behavior {
newValue(a: A): void {
- const changed = a !== this.last;
- if (changed) {
- const t = tick();
- this.last = a;
- this.changedAt = t;
- if (this.state === State.Push) {
- this.pulledAt = t;
- pushToChildren(t, this);
- }
+ if (a === this.last) {
+ return;
+ }
+ const t = tick();
+ this.last = a;
+ this.changedAt = t;
+ if (this.state === State.Push) {
+ this.pulledAt = t;
+ pushToChildren(t, this);
}
}
- pull(t: number): void {
- this.last = this.update(t);
+ pull(t: number) {
+ refresh(this, t);
}
- activate(): void {
+ activate(t: Time): void {
if (this.state === State.Inactive) {
this.activateProducer();
}
this.state = State.Push;
+ this.changedAt = t;
}
deactivate(): void {
this.state = State.Inactive;
@@ -284,17 +284,6 @@ class FlatMapBehavior extends Behavior {
super();
this.parents = cons(this.outer);
}
- pushB(t: number): void {
- const newValue = this.update(t);
- this.pulledAt = t;
- if (this.last !== newValue) {
- this.changedAt = t;
- this.last = newValue;
- if (this.state === State.Push) {
- pushToChildren(t, this);
- }
- }
- }
update(t: number): B {
const outerChanged = this.outer.changedAt > this.changedAt;
if (outerChanged || this.changedAt === undefined) {
@@ -337,38 +326,34 @@ export function when(b: Behavior): Now> {
}
class SnapshotBehavior extends Behavior> implements SListener {
- private afterFuture: boolean;
private node: Node = new Node(this);
- constructor(private parent: Behavior, future: Future) {
+ constructor(private parent: Behavior, private future: Future) {
super();
if (future.state === State.Done) {
// Future has occurred at some point in the past
- this.afterFuture = true;
this.state = parent.state;
this.parents = cons(parent);
this.last = Future.of(at(parent));
} else {
- this.afterFuture = false;
this.state = State.Push;
+ this.parents = nil;
this.last = F.sinkFuture();
future.addListener(this.node, tick());
}
}
pushS(t: number, val: A): void {
- if (this.afterFuture === false) {
- // The push is coming from the Future, it has just occurred.
- this.afterFuture = true;
- this.last.resolve(at(this.parent));
- this.parent.addListener(this.node, t);
+ this.last.resolve(this.parent.at(t), t);
+ this.parents = cons(this.parent);
+ this.changeStateDown(this.state);
+ this.parent.addListener(this.node, t);
+ }
+ update(t: Time): Future {
+ if (this.future.state === State.Done) {
+ return Future.of(this.parent.at(t));
} else {
- // We are receiving an update from `parent` after `future` has
- // occurred.
- this.last = Future.of(val);
+ return this.last;
}
}
- update(_t: Time): Future {
- return this.last;
- }
}
export function snapshotAt(
@@ -402,12 +387,14 @@ export class FunctionBehavior extends ActiveBehavior {
constructor(private f: (t: Time) => A) {
super();
this.state = State.Pull;
+ this.parents = nil;
}
- pull(t: Time): void {
- if (this.pulledAt !== t) {
- refresh(this, t);
- this.pulledAt = t;
+ pull(t: Time) {
+ if (this.pulledAt === t) {
+ return;
}
+ this.pulledAt = t;
+ refresh(this, t);
}
update(t: Time): A {
return this.f(t);
@@ -435,7 +422,7 @@ class SwitcherBehavior extends ActiveBehavior
this.last = b.last;
next.addListener(this.nNode, t);
}
- update(_t: number): A {
+ update(t: Time): A {
return this.b.last;
}
pushS(t: number, value: Behavior): void {
@@ -446,11 +433,11 @@ class SwitcherBehavior extends ActiveBehavior
this.b = newB;
this.parents = cons(newB);
newB.addListener(this.bNode, t);
- const newState = newB.state;
- if (newState !== this.state) {
- this.changeStateDown(newState);
+ this.changeStateDown(newB.state);
+ refresh(this, t);
+ if (this.changedAt === t && this.state === State.Push) {
+ pushToChildren(t, this);
}
- this.pushB(t);
}
}
@@ -532,7 +519,7 @@ class ActiveAccumBehavior extends ActiveBehavior
pushToChildren(t, this);
}
}
- pull(_t: number): void {}
+ pull(_t: number) {}
update(_t: number): B {
throw new Error("Update should never be called.");
}
@@ -553,7 +540,7 @@ export class AccumBehavior extends ActiveBehavior> {
update(t: number): Behavior {
return new ActiveAccumBehavior(this.f, this.initial, this.source, t);
}
- pull(t: Time): void {
+ pull(t: Time) {
this.last = this.update(t);
this.changedAt = t;
this.pulledAt = t;
@@ -685,7 +672,7 @@ class MomentBehavior extends Behavior {
}
}
}
- pull(t: number): void {
+ pull(t: number) {
this.pulledAt = t;
refresh(this, t);
}
diff --git a/src/common.ts b/src/common.ts
index 881e1ef..f6eb7eb 100644
--- a/src/common.ts
+++ b/src/common.ts
@@ -49,7 +49,7 @@ export class PushOnlyObserver implements BListener, SListener {
}
}
pushB(t: number): void {
- this.callback((this.source as any).last);
+ this.callback((>this.source).last);
}
pushS(t: number, value: A): void {
this.callback(value);
@@ -145,7 +145,7 @@ export class CbObserver implements BListener, SListener {
}
}
pushB(t: number): void {
- this.callback((this.source as any).last);
+ this.callback((>this.source).last);
}
pushS(t: number, value: A): void {
this.callback(value);
diff --git a/src/placeholder.ts b/src/placeholder.ts
index 234b43d..c511278 100644
--- a/src/placeholder.ts
+++ b/src/placeholder.ts
@@ -1,4 +1,4 @@
-import { Reactive, State, SListener, BListener } from "./common";
+import { Reactive, State, SListener, BListener, Time } from "./common";
import { Behavior, isBehavior, MapBehavior, pushToChildren } from "./behavior";
import { Node, cons } from "./datastructures";
import { Stream, MapToStream } from "./stream";
@@ -17,12 +17,13 @@ export class Placeholder extends Behavior {
source: Reactive | SListener | BListener>;
private node: Node = new Node(this);
replaceWith(
- parent: Reactive | SListener | BListener>
+ parent: Reactive | SListener | BListener>,
+ t?: number
): void {
this.source = parent;
this.parents = cons(parent);
if (this.children.head !== undefined) {
- const t = tick();
+ t = t !== undefined ? t : tick();
this.activate(t);
if (isBehavior(parent) && this.state === State.Push) {
pushToChildren(t, this);
@@ -34,18 +35,14 @@ export class Placeholder extends Behavior {
(child).pushS(t, a);
}
}
- pull(t: number): void {
+ pull(t: number) {
if (this.source === undefined) {
throw new SamplePlaceholderError(this);
} else if (isBehavior(this.source)) {
+ this.source.pull(t);
this.pulledAt = t;
- if (this.source.pulledAt !== t) {
- this.source.pull(t);
- }
- if (this.last !== this.source.last) {
- this.changedAt = t;
- this.last = this.source.last;
- }
+ this.changedAt = t;
+ this.last = this.source.last;
} else {
throw new Error("Unsupported pulling on placeholder");
}
@@ -78,6 +75,10 @@ export class Placeholder extends Behavior {
}
}
+export function isPlaceholder(p): p is Placeholder {
+ return typeof p === "object" && "replaceWith" in p;
+}
+
class MapPlaceholder extends MapBehavior {
pushS(t: number, a: A): void {
// @ts-ignore
@@ -86,11 +87,18 @@ class MapPlaceholder extends MapBehavior {
}
class MapToPlaceholder extends MapToStream {
- last: B;
- update(): B {
+ changedAt;
+ constructor(parent, public last: B) {
+ super(parent, last);
+ }
+ update(_t): B {
return (this).b;
}
- pull(): void {}
+ pull(t) {
+ if (this.changedAt === undefined) {
+ this.changedAt = t;
+ }
+ }
}
function install(target: Function, source: Function): void {
diff --git a/test/behavior.ts b/test/behavior.ts
index 0da7a16..bd42d6b 100644
--- a/test/behavior.ts
+++ b/test/behavior.ts
@@ -54,6 +54,7 @@ describe("behavior", () => {
assert.isFalse(Behavior.is(1));
});
});
+
describe("producer", () => {
it("activates and deactivates", () => {
const activate = spy();
@@ -122,13 +123,13 @@ describe("behavior", () => {
it("pulls from time varying functions", () => {
let time = 0;
const b = H.fromFunction(() => time);
- assert.equal(H.at(b, 1), 0);
+ assert.equal(H.at(b), 0);
time = 1;
- assert.equal(H.at(b, 2), 1);
+ assert.equal(H.at(b), 1);
time = 2;
- assert.equal(H.at(b, 3), 2);
+ assert.equal(H.at(b), 2);
time = 3;
- assert.equal(H.at(b, 4), 3);
+ assert.equal(H.at(b), 3);
});
it("does not recompute when pulling with same timestamp", () => {
let callCount = 0;
@@ -241,15 +242,15 @@ describe("behavior", () => {
const applied = H.ap(fnB, numE);
const cb = spy();
applied.observe(cb, (pull) => {
- pull(1);
+ pull();
push(add(2), fnB);
- pull(2);
+ pull();
n = 4;
- pull(3);
+ pull();
push(double, fnB);
- pull(4);
+ pull();
n = 8;
- pull(5);
+ pull();
return () => {};
});
assert.deepEqual(cb.args, [[6], [3], [6], [8], [16]]);
@@ -283,6 +284,20 @@ describe("behavior", () => {
);
assert.strictEqual(H.at(b), 18);
});
+ it("handles diamond dependency", () => {
+ // p
+ // / \
+ // b1 b2
+ // \ /
+ // b3
+ const p = sinkBehavior(3);
+ const b1 = p.map((n) => n * n);
+ const b2 = p.map((n) => n + 4);
+ const b3 = H.lift((n, m) => n + m, b1, b2);
+ const cb = subscribeSpy(b3);
+ p.newValue(4);
+ assert.deepEqual(cb.args, [[16], [24]]);
+ });
});
});
describe("flatMap", () => {
@@ -740,6 +755,7 @@ describe("Behavior and Future", () => {
const frozenBehavior = runNow(H.freezeAt(b, f));
frozenBehavior.subscribe(cb);
b.push("b");
+ assert.deepEqual(cb.args, [["a"], ["b"]]);
f.resolve("c");
b.push("d");
assert.deepEqual(cb.args, [["a"], ["b"]]);
diff --git a/test/placeholder.ts b/test/placeholder.ts
index eebb16a..7b7767f 100644
--- a/test/placeholder.ts
+++ b/test/placeholder.ts
@@ -219,7 +219,7 @@ describe("placeholder", () => {
sink.push(1);
assert.deepEqual(cb.args, [[1]]);
});
- it.skip("handles diamond dependency", () => {
+ it("handles diamond dependency", () => {
// p
// / \
// b1 b2