From 3a3cfd044cbb4e9385543c7ccdbac9423d692b42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emil=20Gj=C3=B8rup?= Date: Wed, 4 Sep 2019 11:11:13 +0200 Subject: [PATCH 1/2] push triggers pull, fix glitches --- src/behavior.ts | 128 ++++++++++++++++++++------------------------ src/common.ts | 8 ++- src/placeholder.ts | 37 ++++++++----- test/behavior.ts | 34 ++++++++---- test/placeholder.ts | 2 +- 5 files changed, 114 insertions(+), 95 deletions(-) diff --git a/src/behavior.ts b/src/behavior.ts index 9195dac..e812664 100644 --- a/src/behavior.ts +++ b/src/behavior.ts @@ -1,5 +1,5 @@ import { cons, DoubleLinkedList, Node, fromArray, nil } from "./datastructures"; -import { combine } from "./index"; +import { combine, isPlaceholder } from "./index"; import { State, Reactive, Time, BListener, Parent, SListener } from "./common"; import { Future, BehaviorFuture } from "./future"; import * as F from "./future"; @@ -69,14 +69,13 @@ export abstract class Behavior extends Reactive } abstract update(t: number): A; pushB(t: number): void { - if (this.state === State.Push) { - const newValue = this.update(t); - this.pulledAt = t; - if (this.last !== newValue) { - this.changedAt = t; - this.last = newValue; - pushToChildren(t, this); - } + if (this.changedAt === t) { + // This prevents a second push with same timestamp, to be further pushed + return; + } + this.pull(t); + if (this.changedAt === t && this.state === State.Push) { + pushToChildren(t, this); } } pull(t: number): void { @@ -84,12 +83,11 @@ export abstract class Behavior extends Reactive this.pulledAt = t; let shouldRefresh = this.changedAt === undefined; for (const parent of this.parents) { - if (isBehavior(parent)) { - if (parent.state !== State.Push && parent.pulledAt !== t) { - parent.pull(t); - } - shouldRefresh = shouldRefresh || parent.changedAt > this.changedAt; + if (!isBehavior(parent)) { + continue; } + parent.pull(t); + shouldRefresh = shouldRefresh || this.changedAt < parent.changedAt; } if (shouldRefresh) { refresh(this, t); @@ -128,39 +126,44 @@ export function pushToChildren(t: number, b: Behavior): void { } } -function refresh(b: Behavior, t: number): void { +function refresh(b: Behavior, t: number) { const newValue = b.update(t); - if (newValue !== b.last) { - b.changedAt = t; - b.last = newValue; + if (newValue === b.last) { + return; } + b.changedAt = t; + b.last = newValue; } export function isBehavior(b: any): b is Behavior { - return typeof b === "object" && "at" in b; + return ( + (typeof b === "object" && "at" in b && !isPlaceholder(b)) || + (isPlaceholder(b) && (b.source === undefined || isBehavior(b.source))) + ); } export abstract class ProducerBehavior extends Behavior { newValue(a: A): void { - const changed = a !== this.last; - if (changed) { - const t = tick(); - this.last = a; - this.changedAt = t; - if (this.state === State.Push) { - this.pulledAt = t; - pushToChildren(t, this); - } + if (a === this.last) { + return; + } + const t = tick(); + this.last = a; + this.changedAt = t; + if (this.state === State.Push) { + this.pulledAt = t; + pushToChildren(t, this); } } - pull(t: number): void { - this.last = this.update(t); + pull(t: number) { + refresh(this, t); } - activate(): void { + activate(t: Time): void { if (this.state === State.Inactive) { this.activateProducer(); } this.state = State.Push; + this.changedAt = t; } deactivate(): void { this.state = State.Inactive; @@ -284,17 +287,6 @@ class FlatMapBehavior extends Behavior { super(); this.parents = cons(this.outer); } - pushB(t: number): void { - const newValue = this.update(t); - this.pulledAt = t; - if (this.last !== newValue) { - this.changedAt = t; - this.last = newValue; - if (this.state === State.Push) { - pushToChildren(t, this); - } - } - } update(t: number): B { const outerChanged = this.outer.changedAt > this.changedAt; if (outerChanged || this.changedAt === undefined) { @@ -337,38 +329,34 @@ export function when(b: Behavior): Now> { } class SnapshotBehavior extends Behavior> implements SListener { - private afterFuture: boolean; private node: Node = new Node(this); - constructor(private parent: Behavior, future: Future) { + constructor(private parent: Behavior, private future: Future) { super(); if (future.state === State.Done) { // Future has occurred at some point in the past - this.afterFuture = true; this.state = parent.state; this.parents = cons(parent); this.last = Future.of(at(parent)); } else { - this.afterFuture = false; this.state = State.Push; + this.parents = nil; this.last = F.sinkFuture(); future.addListener(this.node, tick()); } } pushS(t: number, val: A): void { - if (this.afterFuture === false) { - // The push is coming from the Future, it has just occurred. - this.afterFuture = true; - this.last.resolve(at(this.parent)); - this.parent.addListener(this.node, t); + this.last.resolve(this.parent.at(t), t); + this.parents = cons(this.parent); + this.changeStateDown(this.state); + this.parent.addListener(this.node, t); + } + update(t: Time): Future { + if (this.future.state === State.Done) { + return Future.of(this.parent.at(t)); } else { - // We are receiving an update from `parent` after `future` has - // occurred. - this.last = Future.of(val); + return this.last; } } - update(_t: Time): Future { - return this.last; - } } export function snapshotAt( @@ -402,12 +390,14 @@ export class FunctionBehavior extends ActiveBehavior { constructor(private f: (t: Time) => A) { super(); this.state = State.Pull; + this.parents = nil; } - pull(t: Time): void { - if (this.pulledAt !== t) { - refresh(this, t); - this.pulledAt = t; + pull(t: Time) { + if (this.pulledAt === t) { + return; } + this.pulledAt = t; + refresh(this, t); } update(t: Time): A { return this.f(t); @@ -435,7 +425,7 @@ class SwitcherBehavior extends ActiveBehavior this.last = b.last; next.addListener(this.nNode, t); } - update(_t: number): A { + update(t: Time): A { return this.b.last; } pushS(t: number, value: Behavior): void { @@ -446,11 +436,11 @@ class SwitcherBehavior extends ActiveBehavior this.b = newB; this.parents = cons(newB); newB.addListener(this.bNode, t); - const newState = newB.state; - if (newState !== this.state) { - this.changeStateDown(newState); + this.changeStateDown(newB.state); + refresh(this, t); + if (this.changedAt === t && this.state === State.Push) { + pushToChildren(t, this); } - this.pushB(t); } } @@ -532,7 +522,7 @@ class ActiveAccumBehavior extends ActiveBehavior pushToChildren(t, this); } } - pull(_t: number): void {} + pull(_t: number) {} update(_t: number): B { throw new Error("Update should never be called."); } @@ -553,7 +543,7 @@ export class AccumBehavior extends ActiveBehavior> { update(t: number): Behavior { return new ActiveAccumBehavior(this.f, this.initial, this.source, t); } - pull(t: Time): void { + pull(t: Time) { this.last = this.update(t); this.changedAt = t; this.pulledAt = t; @@ -685,7 +675,7 @@ class MomentBehavior extends Behavior { } } } - pull(t: number): void { + pull(t: number) { this.pulledAt = t; refresh(this, t); } diff --git a/src/common.ts b/src/common.ts index 881e1ef..7e5afb8 100644 --- a/src/common.ts +++ b/src/common.ts @@ -49,7 +49,9 @@ export class PushOnlyObserver implements BListener, SListener { } } pushB(t: number): void { - this.callback((this.source as any).last); + const b = >this.source; + b.pull(t); + this.callback(b.last); } pushS(t: number, value: A): void { this.callback(value); @@ -145,7 +147,9 @@ export class CbObserver implements BListener, SListener { } } pushB(t: number): void { - this.callback((this.source as any).last); + const b = >this.source; + b.pull(t); + this.callback(b.last); } pushS(t: number, value: A): void { this.callback(value); diff --git a/src/placeholder.ts b/src/placeholder.ts index 234b43d..89caed9 100644 --- a/src/placeholder.ts +++ b/src/placeholder.ts @@ -1,4 +1,4 @@ -import { Reactive, State, SListener, BListener } from "./common"; +import { Reactive, State, SListener, BListener, Time } from "./common"; import { Behavior, isBehavior, MapBehavior, pushToChildren } from "./behavior"; import { Node, cons } from "./datastructures"; import { Stream, MapToStream } from "./stream"; @@ -17,12 +17,13 @@ export class Placeholder extends Behavior { source: Reactive | SListener | BListener>; private node: Node = new Node(this); replaceWith( - parent: Reactive | SListener | BListener> + parent: Reactive | SListener | BListener>, + t?: number ): void { this.source = parent; this.parents = cons(parent); if (this.children.head !== undefined) { - const t = tick(); + t = t !== undefined ? t : tick(); this.activate(t); if (isBehavior(parent) && this.state === State.Push) { pushToChildren(t, this); @@ -34,18 +35,14 @@ export class Placeholder extends Behavior { (child).pushS(t, a); } } - pull(t: number): void { + pull(t: number) { if (this.source === undefined) { throw new SamplePlaceholderError(this); } else if (isBehavior(this.source)) { + this.source.pull(t); this.pulledAt = t; - if (this.source.pulledAt !== t) { - this.source.pull(t); - } - if (this.last !== this.source.last) { - this.changedAt = t; - this.last = this.source.last; - } + this.changedAt = t; + this.last = this.source.last; } else { throw new Error("Unsupported pulling on placeholder"); } @@ -57,6 +54,7 @@ export class Placeholder extends Behavior { if (this.source !== undefined) { this.source.addListener(this.node, t); if (isBehavior(this.source)) { + this.pull(t); this.last = this.source.last; this.changedAt = this.source.changedAt; this.pulledAt = this.source.pulledAt; @@ -78,6 +76,10 @@ export class Placeholder extends Behavior { } } +export function isPlaceholder(p): p is Placeholder { + return typeof p === "object" && "replaceWith" in p; +} + class MapPlaceholder extends MapBehavior { pushS(t: number, a: A): void { // @ts-ignore @@ -86,11 +88,18 @@ class MapPlaceholder extends MapBehavior { } class MapToPlaceholder extends MapToStream { - last: B; - update(): B { + changedAt; + constructor(parent, public last: B) { + super(parent, last); + } + update(_t): B { return (this).b; } - pull(): void {} + pull(t) { + if (this.changedAt === undefined) { + this.changedAt = t; + } + } } function install(target: Function, source: Function): void { diff --git a/test/behavior.ts b/test/behavior.ts index 0da7a16..bd42d6b 100644 --- a/test/behavior.ts +++ b/test/behavior.ts @@ -54,6 +54,7 @@ describe("behavior", () => { assert.isFalse(Behavior.is(1)); }); }); + describe("producer", () => { it("activates and deactivates", () => { const activate = spy(); @@ -122,13 +123,13 @@ describe("behavior", () => { it("pulls from time varying functions", () => { let time = 0; const b = H.fromFunction(() => time); - assert.equal(H.at(b, 1), 0); + assert.equal(H.at(b), 0); time = 1; - assert.equal(H.at(b, 2), 1); + assert.equal(H.at(b), 1); time = 2; - assert.equal(H.at(b, 3), 2); + assert.equal(H.at(b), 2); time = 3; - assert.equal(H.at(b, 4), 3); + assert.equal(H.at(b), 3); }); it("does not recompute when pulling with same timestamp", () => { let callCount = 0; @@ -241,15 +242,15 @@ describe("behavior", () => { const applied = H.ap(fnB, numE); const cb = spy(); applied.observe(cb, (pull) => { - pull(1); + pull(); push(add(2), fnB); - pull(2); + pull(); n = 4; - pull(3); + pull(); push(double, fnB); - pull(4); + pull(); n = 8; - pull(5); + pull(); return () => {}; }); assert.deepEqual(cb.args, [[6], [3], [6], [8], [16]]); @@ -283,6 +284,20 @@ describe("behavior", () => { ); assert.strictEqual(H.at(b), 18); }); + it("handles diamond dependency", () => { + // p + // / \ + // b1 b2 + // \ / + // b3 + const p = sinkBehavior(3); + const b1 = p.map((n) => n * n); + const b2 = p.map((n) => n + 4); + const b3 = H.lift((n, m) => n + m, b1, b2); + const cb = subscribeSpy(b3); + p.newValue(4); + assert.deepEqual(cb.args, [[16], [24]]); + }); }); }); describe("flatMap", () => { @@ -740,6 +755,7 @@ describe("Behavior and Future", () => { const frozenBehavior = runNow(H.freezeAt(b, f)); frozenBehavior.subscribe(cb); b.push("b"); + assert.deepEqual(cb.args, [["a"], ["b"]]); f.resolve("c"); b.push("d"); assert.deepEqual(cb.args, [["a"], ["b"]]); diff --git a/test/placeholder.ts b/test/placeholder.ts index eebb16a..7b7767f 100644 --- a/test/placeholder.ts +++ b/test/placeholder.ts @@ -219,7 +219,7 @@ describe("placeholder", () => { sink.push(1); assert.deepEqual(cb.args, [[1]]); }); - it.skip("handles diamond dependency", () => { + it("handles diamond dependency", () => { // p // / \ // b1 b2 From f22b3af8a36f14098d7fb89df5a81b063226b812 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emil=20Gj=C3=B8rup?= Date: Wed, 4 Sep 2019 16:43:36 +0200 Subject: [PATCH 2/2] cleanup and refactor to follow code conventions --- src/behavior.ts | 19 ++++++++----------- src/common.ts | 8 ++------ src/placeholder.ts | 1 - 3 files changed, 10 insertions(+), 18 deletions(-) diff --git a/src/behavior.ts b/src/behavior.ts index e812664..f1f561b 100644 --- a/src/behavior.ts +++ b/src/behavior.ts @@ -69,13 +69,11 @@ export abstract class Behavior extends Reactive } abstract update(t: number): A; pushB(t: number): void { - if (this.changedAt === t) { - // This prevents a second push with same timestamp, to be further pushed - return; - } - this.pull(t); - if (this.changedAt === t && this.state === State.Push) { - pushToChildren(t, this); + if (this.pulledAt !== t) { + this.pull(t); + if (this.changedAt === t && this.state === State.Push) { + pushToChildren(t, this); + } } } pull(t: number): void { @@ -83,11 +81,10 @@ export abstract class Behavior extends Reactive this.pulledAt = t; let shouldRefresh = this.changedAt === undefined; for (const parent of this.parents) { - if (!isBehavior(parent)) { - continue; + if (isBehavior(parent)) { + parent.pull(t); + shouldRefresh = shouldRefresh || this.changedAt < parent.changedAt; } - parent.pull(t); - shouldRefresh = shouldRefresh || this.changedAt < parent.changedAt; } if (shouldRefresh) { refresh(this, t); diff --git a/src/common.ts b/src/common.ts index 7e5afb8..f6eb7eb 100644 --- a/src/common.ts +++ b/src/common.ts @@ -49,9 +49,7 @@ export class PushOnlyObserver implements BListener, SListener { } } pushB(t: number): void { - const b = >this.source; - b.pull(t); - this.callback(b.last); + this.callback((>this.source).last); } pushS(t: number, value: A): void { this.callback(value); @@ -147,9 +145,7 @@ export class CbObserver implements BListener, SListener { } } pushB(t: number): void { - const b = >this.source; - b.pull(t); - this.callback(b.last); + this.callback((>this.source).last); } pushS(t: number, value: A): void { this.callback(value); diff --git a/src/placeholder.ts b/src/placeholder.ts index 89caed9..c511278 100644 --- a/src/placeholder.ts +++ b/src/placeholder.ts @@ -54,7 +54,6 @@ export class Placeholder extends Behavior { if (this.source !== undefined) { this.source.addListener(this.node, t); if (isBehavior(this.source)) { - this.pull(t); this.last = this.source.last; this.changedAt = this.source.changedAt; this.pulledAt = this.source.pulledAt;