Skip to content

Commit b65dbb7

Browse files
committed
createReplaySignal to handle thunk registration
1 parent 075fb4d commit b65dbb7

File tree

3 files changed

+127
-16
lines changed

3 files changed

+127
-16
lines changed

src/fx/replay-signal.ts

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import type { Resolve, Subscription } from "effection";
2+
import { action, resource } from "effection";
3+
4+
export function createReplaySignal<T, TClose>() {
5+
const subscribers = new Set<Subscription<T, TClose>>();
6+
// single shared durable queue storage
7+
const queue = createDurableQueue<T, TClose>();
8+
9+
// each subscriber gets its own iterator over the shared items by
10+
// calling `queue.subscribe()` which returns a Stream
11+
const subscribe = resource<Subscription<T, TClose>>(function* (provide) {
12+
const queued = queue.stream();
13+
subscribers.add(queued);
14+
15+
try {
16+
yield* provide({ next: queued.next });
17+
} finally {
18+
subscribers.delete(queued);
19+
}
20+
});
21+
22+
function send(value: T) {
23+
queue.add(value);
24+
}
25+
26+
function close(value?: TClose) {
27+
queue.close(value);
28+
}
29+
30+
return { ...subscribe, send, close };
31+
}
32+
33+
function createDurableQueue<T, TClose = never>() {
34+
type Item = IteratorResult<T, TClose>;
35+
36+
const items: Item[] = [];
37+
38+
// a set of active subscribers; each subscription has its own iterator
39+
// and its own waiting notifier set
40+
const subscribers = new Set<{
41+
notify: Set<Resolve<Item>>;
42+
}>();
43+
44+
function enqueue(item: Item) {
45+
items.push(item);
46+
for (const sub of subscribers) {
47+
if (sub.notify.size > 0) {
48+
const [resolve] = sub.notify;
49+
// use resolve from yield* action to notify waiting subscribers
50+
resolve(item);
51+
}
52+
}
53+
}
54+
55+
function stream(): Subscription<T, TClose> {
56+
const iter = items[Symbol.iterator]();
57+
const notify = new Set<Resolve<Item>>();
58+
const sub = { notify };
59+
subscribers.add(sub);
60+
61+
return {
62+
*next() {
63+
const item = iter.next().value;
64+
// item will be `undefined` when we've iterated to the end of the
65+
// current `items` array; in that case we wait for new items to be
66+
// enqueued and the resolve will be called with the new `Item`.
67+
if (item !== undefined) {
68+
return item;
69+
}
70+
return yield* action<Item>((resolve) => {
71+
notify.add(resolve);
72+
return () => notify.delete(resolve);
73+
});
74+
},
75+
};
76+
}
77+
78+
return {
79+
add: (value: T) => enqueue({ done: false, value }),
80+
close: (value?: TClose) => enqueue({ done: true, value: value as TClose }),
81+
stream,
82+
};
83+
}

src/query/thunk.ts

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import {
44
type Operation,
55
type Scope,
66
createContext,
7-
createSignal,
87
each,
98
ensure,
109
useScope,
@@ -15,6 +14,7 @@ import { supervise } from "../fx/index.js";
1514
import { createKey } from "./create-key.js";
1615
import { isFn, isObject } from "./util.js";
1716

17+
import { createReplaySignal } from "../fx/replay-signal.js";
1818
import { IdContext } from "../store/store.js";
1919
import type { ActionWithPayload, AnyAction, Next, Payload } from "../types.js";
2020
import type {
@@ -139,10 +139,9 @@ export function createThunks<Ctx extends ThunkCtx = ThunkCtx<any>>(
139139
} = { supervisor: takeEvery },
140140
): ThunksApi<Ctx> {
141141
const storeRegistration = new Set();
142-
const watch = createSignal<Visors>();
142+
const watch = createReplaySignal<Visors>();
143143

144144
const middleware: Middleware<Ctx>[] = [];
145-
const visors: { [key: string]: Visors } = {};
146145
const middlewareMap: { [key: string]: Middleware<Ctx> } = {};
147146
let dynamicMiddlewareMap: { [key: string]: Middleware<Ctx> } = {};
148147
const actionMap: {
@@ -174,7 +173,7 @@ export function createThunks<Ctx extends ThunkCtx = ThunkCtx<any>>(
174173
}
175174

176175
function create(name: string, ...args: any[]) {
177-
if (visors[name]) {
176+
if (actionMap[name]) {
178177
const msg = `[${name}] already exists, do you have two thunks with the same name?`;
179178
console.warn(msg);
180179
}
@@ -216,10 +215,6 @@ export function createThunks<Ctx extends ThunkCtx = ThunkCtx<any>>(
216215
function* curVisor(): Operation<void> {
217216
yield* tt(type, onApi);
218217
}
219-
220-
// maintains a history for any future registration
221-
visors[name] = () => supervise(curVisor);
222-
// signals for any stores already listening
223218
watch.send(() => supervise(curVisor));
224219

225220
const errMsg = `[${name}] is being called before its thunk has been registered. Run \`store.run(thunks.register)\` where \`thunks\` is the name of your \`createThunks\` or \`createApi\` variable.`;
@@ -262,9 +257,6 @@ export function createThunks<Ctx extends ThunkCtx = ThunkCtx<any>>(
262257
return kickoff;
263258
}
264259

265-
// maintains a history for any future registration
266-
visors[name] = curVisor;
267-
// signals for any stores already listening
268260
watch.send(curVisor);
269261

270262
// returns to the user can use this resource from
@@ -285,11 +277,6 @@ export function createThunks<Ctx extends ThunkCtx = ThunkCtx<any>>(
285277
storeRegistration.delete(parentStoreId);
286278
});
287279

288-
// Register any thunks created before listening to signal
289-
for (const created of Object.values(visors)) {
290-
yield* scope.spawn(created(scope));
291-
}
292-
293280
// wait for further thunk create
294281
for (const watched of yield* each(watch)) {
295282
yield* scope.spawn(watched(scope));

src/test/replay-signal.test.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { createReplaySignal } from "../fx/replay-signal.js";
2+
import { each, run, sleep, spawn } from "../index.js";
3+
import { expect, test } from "../test.js";
4+
5+
test("should call the generator function", async () => {
6+
const thunk = createReplaySignal();
7+
const checkOne = [] as string[];
8+
const checkTwo = [] as string[];
9+
const checkThree = [] as string[];
10+
11+
function* valPush(arrKey: string, arr: string[]) {
12+
for (const val of yield* each(thunk)) {
13+
console.log(arrKey, "received", val);
14+
arr.push(val as string);
15+
yield* each.next();
16+
}
17+
}
18+
19+
await run(function* () {
20+
console.log("spawning consumers");
21+
const v1 = yield* spawn(() => valPush("checkOne", checkOne));
22+
yield* sleep(0); // allow spawns to start
23+
thunk.send("first");
24+
const v2 = yield* spawn(() => valPush("checkTwo", checkTwo));
25+
yield* sleep(0); // allow spawns to start
26+
thunk.send("second");
27+
const v3 = yield* spawn(() => valPush("checkThree", checkThree));
28+
yield* sleep(0); // allow spawns to start
29+
thunk.send("third");
30+
yield* sleep(0); // allow spawns to start
31+
console.log("sending values");
32+
thunk.close();
33+
yield* v1;
34+
yield* v2;
35+
yield* v3;
36+
});
37+
38+
expect(checkOne).toHaveLength(3);
39+
expect(checkTwo).toHaveLength(3);
40+
expect(checkThree).toHaveLength(3);
41+
});

0 commit comments

Comments
 (0)