Skip to content

Commit 2473ad5

Browse files
authored
run platform workers in a Scope, send errors or termination to a CloseLatch (#4429)
1 parent 42ddd5f commit 2473ad5

File tree

13 files changed

+373
-297
lines changed

13 files changed

+373
-297
lines changed

.changeset/ninety-garlics-walk.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@effect/platform-browser": minor
3+
"@effect/platform-node": minor
4+
"@effect/platform-bun": minor
5+
"@effect/platform": minor
6+
---
7+
8+
run platform workers in a Scope, send errors or termination to a CloseLatch

packages/platform-browser/src/BrowserWorkerRunner.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ import type * as Runner from "@effect/platform/WorkerRunner"
55
import type * as Layer from "effect/Layer"
66
import * as internal from "./internal/workerRunner.js"
77

8+
export {
9+
/**
10+
* @since 1.0.0
11+
* @category re-exports
12+
*/
13+
launch
14+
} from "@effect/platform/WorkerRunner"
15+
816
/**
917
* @since 1.0.0
1018
* @category constructors

packages/platform-browser/src/internal/workerRunner.ts

Lines changed: 94 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { WorkerError } from "@effect/platform/WorkerError"
22
import * as Runner from "@effect/platform/WorkerRunner"
3+
import * as Cause from "effect/Cause"
34
import * as Context from "effect/Context"
45
import * as Deferred from "effect/Deferred"
56
import * as Effect from "effect/Effect"
67
import * as ExecStrategy from "effect/ExecutionStrategy"
78
import * as Exit from "effect/Exit"
8-
import * as FiberId from "effect/FiberId"
99
import * as FiberSet from "effect/FiberSet"
1010
import { globalValue } from "effect/GlobalValue"
1111
import * as Layer from "effect/Layer"
@@ -24,7 +24,7 @@ if (typeof self !== "undefined" && "onconnect" in self) {
2424
export const make = (self: MessagePort | Window) =>
2525
Runner.PlatformRunner.of({
2626
[Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId,
27-
start<I, O>() {
27+
start<I, O>(closeLatch: Deferred.Deferred<void, WorkerError>) {
2828
return Effect.sync(() => {
2929
let currentPortId = 0
3030

@@ -36,102 +36,107 @@ export const make = (self: MessagePort | Window) =>
3636
})
3737
})
3838

39-
const run = <A, E, R>(handler: (portId: number, message: I) => Effect.Effect<A, E, R>) =>
40-
Effect.uninterruptibleMask((restore) =>
41-
Effect.gen(function*() {
42-
const scope = yield* Effect.scope
43-
const runtime = (yield* Effect.runtime<R | Scope.Scope>()).pipe(
44-
Runtime.updateContext(Context.omit(Scope.Scope))
45-
) as Runtime.Runtime<R>
46-
const fiberSet = yield* FiberSet.make<any, WorkerError | E>()
47-
const runFork = Runtime.runFork(runtime)
39+
const run = Effect.fnUntraced(
40+
function*<A, E, R>(handler: (portId: number, message: I) => Effect.Effect<A, E, R>) {
41+
const scope = yield* Effect.scope
42+
const runtime = (yield* Effect.interruptible(Effect.runtime<R | Scope.Scope>())).pipe(
43+
Runtime.updateContext(Context.omit(Scope.Scope))
44+
) as Runtime.Runtime<R>
45+
const fiberSet = yield* FiberSet.make<any, WorkerError | E>()
46+
const runFork = Runtime.runFork(runtime)
47+
function onExit(exit: Exit.Exit<any, E>) {
48+
if (exit._tag === "Failure") {
49+
Deferred.unsafeDone(closeLatch, Exit.die(Cause.squash(exit.cause)))
50+
}
51+
}
4852

49-
function onMessage(portId: number) {
50-
return function(event: MessageEvent) {
51-
const message = (event as MessageEvent).data as Runner.BackingRunner.Message<I>
52-
if (message[0] === 0) {
53-
FiberSet.unsafeAdd(fiberSet, runFork(restore(handler(portId, message[1]))))
54-
} else {
55-
const port = ports.get(portId)
56-
if (port) {
57-
Effect.runFork(Scope.close(port[1], Exit.void))
58-
}
59-
ports.delete(portId)
60-
if (ports.size === 0) {
61-
Deferred.unsafeDone(fiberSet.deferred, Exit.interrupt(FiberId.none))
62-
}
53+
function onMessage(portId: number) {
54+
return function(event: MessageEvent) {
55+
const message = (event as MessageEvent).data as Runner.BackingRunner.Message<I>
56+
if (message[0] === 0) {
57+
const fiber = runFork(handler(portId, message[1]))
58+
fiber.addObserver(onExit)
59+
FiberSet.unsafeAdd(fiberSet, fiber)
60+
} else {
61+
const port = ports.get(portId)
62+
if (port) {
63+
Effect.runFork(Scope.close(port[1], Exit.void))
64+
}
65+
ports.delete(portId)
66+
if (ports.size === 0) {
67+
Deferred.unsafeDone(closeLatch, Exit.void)
6368
}
6469
}
6570
}
66-
function onMessageError(error: MessageEvent) {
67-
Deferred.unsafeDone(
68-
fiberSet.deferred,
69-
new WorkerError({ reason: "decode", cause: error.data })
70-
)
71-
}
72-
function onError(error: any) {
73-
Deferred.unsafeDone(
74-
fiberSet.deferred,
75-
new WorkerError({ reason: "unknown", cause: error.data })
76-
)
77-
}
78-
function handlePort(port: MessagePort) {
79-
const fiber = Scope.fork(scope, ExecStrategy.sequential).pipe(
80-
Effect.flatMap((scope) => {
81-
const portId = currentPortId++
82-
ports.set(portId, [port, scope])
83-
const onMsg = onMessage(portId)
84-
port.addEventListener("message", onMsg)
85-
port.addEventListener("messageerror", onMessageError)
86-
if ("start" in port) {
87-
port.start()
88-
}
89-
port.postMessage([0])
90-
return Scope.addFinalizer(
91-
scope,
92-
Effect.sync(() => {
93-
port.removeEventListener("message", onMsg)
94-
port.removeEventListener("messageerror", onError)
95-
port.close()
96-
})
97-
)
98-
}),
99-
runFork
100-
)
101-
FiberSet.unsafeAdd(fiberSet, fiber)
71+
}
72+
function onMessageError(error: MessageEvent) {
73+
Deferred.unsafeDone(
74+
closeLatch,
75+
new WorkerError({ reason: "decode", cause: error.data })
76+
)
77+
}
78+
function onError(error: any) {
79+
Deferred.unsafeDone(
80+
closeLatch,
81+
new WorkerError({ reason: "unknown", cause: error.data })
82+
)
83+
}
84+
function handlePort(port: MessagePort) {
85+
const fiber = Scope.fork(scope, ExecStrategy.sequential).pipe(
86+
Effect.flatMap((scope) => {
87+
const portId = currentPortId++
88+
ports.set(portId, [port, scope])
89+
const onMsg = onMessage(portId)
90+
port.addEventListener("message", onMsg)
91+
port.addEventListener("messageerror", onMessageError)
92+
if ("start" in port) {
93+
port.start()
94+
}
95+
port.postMessage([0])
96+
return Scope.addFinalizer(
97+
scope,
98+
Effect.sync(() => {
99+
port.removeEventListener("message", onMsg)
100+
port.removeEventListener("messageerror", onError)
101+
port.close()
102+
})
103+
)
104+
}),
105+
runFork
106+
)
107+
fiber.addObserver(onExit)
108+
FiberSet.unsafeAdd(fiberSet, fiber)
109+
}
110+
self.addEventListener("error", onError)
111+
let prevOnConnect: unknown | undefined
112+
if ("onconnect" in self) {
113+
prevOnConnect = self.onconnect
114+
self.onconnect = function(event: MessageEvent) {
115+
const port = (event as MessageEvent).ports[0]
116+
handlePort(port)
102117
}
103-
self.addEventListener("error", onError)
104-
let prevOnConnect: unknown | undefined
105-
if ("onconnect" in self) {
106-
prevOnConnect = self.onconnect
107-
self.onconnect = function(event: MessageEvent) {
108-
const port = (event as MessageEvent).ports[0]
109-
handlePort(port)
110-
}
111-
for (const port of cachedPorts) {
112-
handlePort(port)
113-
}
114-
cachedPorts.clear()
115-
yield* Scope.addFinalizer(
116-
scope,
117-
Effect.sync(() => self.close())
118-
)
119-
} else {
120-
handlePort(self as any)
118+
for (const port of cachedPorts) {
119+
handlePort(port)
121120
}
121+
cachedPorts.clear()
122122
yield* Scope.addFinalizer(
123123
scope,
124-
Effect.sync(() => {
125-
self.removeEventListener("error", onError)
126-
if ("onconnect" in self) {
127-
self.onconnect = prevOnConnect
128-
}
129-
})
124+
Effect.sync(() => self.close())
130125
)
131-
132-
return (yield* restore(FiberSet.join(fiberSet))) as never
133-
}).pipe(Effect.scoped)
134-
)
126+
} else {
127+
handlePort(self as any)
128+
}
129+
yield* Scope.addFinalizer(
130+
scope,
131+
Effect.sync(() => {
132+
self.removeEventListener("error", onError)
133+
if ("onconnect" in self) {
134+
self.onconnect = prevOnConnect
135+
}
136+
})
137+
)
138+
}
139+
)
135140

136141
return { run, send }
137142
})

packages/platform-browser/test/fixtures/serializedWorker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,4 @@ const WorkerLive = Runner.layerSerialized(WorkerMessage, {
3535
Layer.provide(BrowserRunner.layer)
3636
)
3737

38-
Effect.runFork(Layer.launch(WorkerLive))
38+
Effect.runFork(Runner.launch(WorkerLive))

packages/platform-browser/test/fixtures/worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ const WorkerLive = Runner.layer((n: number) => Stream.range(0, n)).pipe(
66
Layer.provide(BrowserRunner.layer)
77
)
88

9-
Effect.runFork(Layer.launch(WorkerLive))
9+
Effect.runFork(Runner.launch(WorkerLive))

packages/platform-bun/examples/worker/range.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ const WorkerLive = Effect.gen(function*() {
88
yield* Effect.addFinalizer(() => Effect.log("worker closed"))
99
}).pipe(Layer.scopedDiscard, Layer.provide(BunWorkerRunner.layer))
1010

11-
BunRuntime.runMain(Layer.launch(WorkerLive))
11+
BunRuntime.runMain(Runner.launch(WorkerLive))

packages/platform-bun/src/BunWorkerRunner.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ import type * as Runner from "@effect/platform/WorkerRunner"
55
import type * as Layer from "effect/Layer"
66
import * as internal from "./internal/workerRunner.js"
77

8+
export {
9+
/**
10+
* @since 1.0.0
11+
* @category re-exports
12+
*/
13+
launch
14+
} from "@effect/platform/WorkerRunner"
15+
816
/**
917
* @since 1.0.0
1018
* @category layers

0 commit comments

Comments
 (0)