Skip to content

Commit c1792da

Browse files
tim-smarteffect-bot
authored andcommitted
add {FiberHandle,FiberSet,FiberMap}.awaitEmpty apis (#4337)
1 parent a1213ff commit c1792da

File tree

7 files changed

+162
-104
lines changed

7 files changed

+162
-104
lines changed

.changeset/tough-cars-invite.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"effect": minor
3+
---
4+
5+
add {FiberHandle,FiberSet,FiberMap}.awaitEmpty apis

packages/effect/src/FiberHandle.ts

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -339,44 +339,31 @@ export const run: {
339339
} = function() {
340340
const self = arguments[0] as FiberHandle
341341
if (Effect.isEffect(arguments[1])) {
342-
const effect = arguments[1]
343-
const options = arguments[2] as {
344-
readonly onlyIfMissing?: boolean
345-
readonly propagateInterruption?: boolean | undefined
346-
} | undefined
347-
return Effect.suspend(() => {
348-
if (self.state._tag === "Closed") {
349-
return Effect.interrupt
350-
} else if (self.state.fiber !== undefined && options?.onlyIfMissing === true) {
351-
return Effect.sync(constInterruptedFiber)
352-
}
353-
return Effect.uninterruptibleMask((restore) =>
354-
Effect.tap(
355-
restore(Effect.forkDaemon(effect)),
356-
(fiber) => set(self, fiber, options)
357-
)
358-
)
359-
}) as any
342+
return runImpl(self, arguments[1], arguments[2]) as any
360343
}
361-
const options = arguments[1] as {
344+
const options = arguments[1]
345+
return (effect: Effect.Effect<unknown, unknown, any>) => runImpl(self, effect, options)
346+
}
347+
348+
const runImpl = <A, E, R, XE extends E, XA extends A>(
349+
self: FiberHandle<A, E>,
350+
effect: Effect.Effect<XA, XE, R>,
351+
options?: {
362352
readonly onlyIfMissing?: boolean
363353
readonly propagateInterruption?: boolean | undefined
364-
} | undefined
365-
return (effect: Effect.Effect<unknown, unknown, any>) =>
366-
Effect.suspend(() => {
367-
if (self.state._tag === "Closed") {
368-
return Effect.interrupt
369-
} else if (self.state.fiber !== undefined && options?.onlyIfMissing === true) {
370-
return Effect.sync(constInterruptedFiber)
371-
}
372-
return Effect.uninterruptibleMask((restore) =>
373-
Effect.tap(
374-
restore(Effect.forkDaemon(effect)),
375-
(fiber) => set(self, fiber, options)
376-
)
377-
)
378-
})
379-
}
354+
}
355+
): Effect.Effect<Fiber.RuntimeFiber<XA, XE>, never, R> =>
356+
Effect.fiberIdWith((fiberId) => {
357+
if (self.state._tag === "Closed") {
358+
return Effect.interrupt
359+
} else if (self.state.fiber !== undefined && options?.onlyIfMissing === true) {
360+
return Effect.sync(constInterruptedFiber)
361+
}
362+
return Effect.tap(
363+
Effect.forkDaemon(effect),
364+
(fiber) => unsafeSet(self, fiber, { ...options, interruptAs: fiberId })
365+
)
366+
})
380367

381368
/**
382369
* Capture a Runtime and use it to fork Effect's, adding the forked fibers to the FiberHandle.
@@ -470,3 +457,17 @@ export const runtime: <A, E>(
470457
*/
471458
export const join = <A, E>(self: FiberHandle<A, E>): Effect.Effect<void, E> =>
472459
Deferred.await(self.deferred as Deferred.Deferred<void, E>)
460+
461+
/**
462+
* Wait for the fiber in the FiberHandle to complete.
463+
*
464+
* @since 3.13.0
465+
* @categories combinators
466+
*/
467+
export const awaitEmpty = <A, E>(self: FiberHandle<A, E>): Effect.Effect<void, E> =>
468+
Effect.suspend(() => {
469+
if (self.state._tag === "Closed" || self.state.fiber === undefined) {
470+
return Effect.void
471+
}
472+
return Fiber.await(self.state.fiber)
473+
})

packages/effect/src/FiberMap.ts

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import * as Effect from "./Effect.js"
88
import * as Exit from "./Exit.js"
99
import * as Fiber from "./Fiber.js"
1010
import * as FiberId from "./FiberId.js"
11-
import { constFalse, dual } from "./Function.js"
11+
import { constFalse, constVoid, dual } from "./Function.js"
1212
import * as HashSet from "./HashSet.js"
1313
import * as Inspectable from "./Inspectable.js"
1414
import * as Iterable from "./Iterable.js"
@@ -438,49 +438,35 @@ export const run: {
438438
} | undefined
439439
): Effect.Effect<Fiber.RuntimeFiber<XA, XE>, never, R>
440440
} = function() {
441+
const self = arguments[0]
441442
if (Effect.isEffect(arguments[2])) {
442-
const self = arguments[0] as FiberMap<any>
443-
const key = arguments[1]
444-
const effect = arguments[2] as Effect.Effect<any, any, any>
445-
const options = arguments[3] as {
446-
readonly onlyIfMissing?: boolean
447-
readonly propagateInterruption?: boolean | undefined
448-
} | undefined
449-
return Effect.suspend(() => {
450-
if (self.state._tag === "Closed") {
451-
return Effect.interrupt
452-
} else if (options?.onlyIfMissing === true && unsafeHas(self, key)) {
453-
return Effect.sync(constInterruptedFiber)
454-
}
455-
return Effect.uninterruptibleMask((restore) =>
456-
Effect.tap(
457-
restore(Effect.forkDaemon(effect)),
458-
(fiber) => set(self, key, fiber, options)
459-
)
460-
)
461-
}) as any
443+
return runImpl(self, arguments[1], arguments[2], arguments[3]) as any
462444
}
463-
const self = arguments[0] as FiberMap<any>
464445
const key = arguments[1]
465-
const options = arguments[2] as {
446+
const options = arguments[2]
447+
return (effect: Effect.Effect<any, any, any>) => runImpl(self, key, effect, options)
448+
}
449+
450+
const runImpl = <K, A, E, R, XE extends E, XA extends A>(
451+
self: FiberMap<K, A, E>,
452+
key: K,
453+
effect: Effect.Effect<XA, XE, R>,
454+
options?: {
466455
readonly onlyIfMissing?: boolean
467456
readonly propagateInterruption?: boolean | undefined
468-
} | undefined
469-
return (effect: Effect.Effect<any, any, any>) =>
470-
Effect.suspend(() => {
471-
if (self.state._tag === "Closed") {
472-
return Effect.interrupt
473-
} else if (options?.onlyIfMissing === true && unsafeHas(self, key)) {
474-
return Effect.sync(constInterruptedFiber)
475-
}
476-
return Effect.uninterruptibleMask((restore) =>
477-
Effect.tap(
478-
restore(Effect.forkDaemon(effect)),
479-
(fiber) => set(self, key, fiber, options)
480-
)
481-
)
482-
})
483-
}
457+
}
458+
) =>
459+
Effect.fiberIdWith((fiberId) => {
460+
if (self.state._tag === "Closed") {
461+
return Effect.interrupt
462+
} else if (options?.onlyIfMissing === true && unsafeHas(self, key)) {
463+
return Effect.sync(constInterruptedFiber)
464+
}
465+
return Effect.tap(
466+
Effect.forkDaemon(effect),
467+
(fiber) => unsafeSet(self, key, fiber, { ...options, interruptAs: fiberId })
468+
)
469+
})
484470

485471
/**
486472
* Capture a Runtime and use it to fork Effect's, adding the forked fibers to the FiberMap.
@@ -581,3 +567,16 @@ export const size = <K, A, E>(self: FiberMap<K, A, E>): Effect.Effect<number> =>
581567
*/
582568
export const join = <K, A, E>(self: FiberMap<K, A, E>): Effect.Effect<void, E> =>
583569
Deferred.await(self.deferred as Deferred.Deferred<void, E>)
570+
571+
/**
572+
* Wait for the FiberMap to be empty.
573+
*
574+
* @since 3.13.0
575+
* @categories combinators
576+
*/
577+
export const awaitEmpty = <K, A, E>(self: FiberMap<K, A, E>): Effect.Effect<void, E> =>
578+
Effect.whileLoop({
579+
while: () => self.state._tag === "Open" && MutableHashMap.size(self.state.backing) > 0,
580+
body: () => Fiber.await(Iterable.unsafeHead(self)[1]),
581+
step: constVoid
582+
})

packages/effect/src/FiberSet.ts

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import * as Effect from "./Effect.js"
77
import * as Exit from "./Exit.js"
88
import * as Fiber from "./Fiber.js"
99
import * as FiberId from "./FiberId.js"
10-
import { constFalse, dual } from "./Function.js"
10+
import { constFalse, constVoid, dual } from "./Function.js"
1111
import * as HashSet from "./HashSet.js"
1212
import * as Inspectable from "./Inspectable.js"
1313
import * as Iterable from "./Iterable.js"
@@ -291,34 +291,32 @@ export const run: {
291291
} = function() {
292292
const self = arguments[0] as FiberSet<any, any>
293293
if (!Effect.isEffect(arguments[1])) {
294-
const options = arguments[1] as { readonly propagateInterruption?: boolean | undefined } | undefined
295-
return (effect: Effect.Effect<any, any, any>) =>
296-
Effect.suspend(() => {
297-
if (self.state._tag === "Closed") {
298-
return Effect.interrupt
299-
}
300-
return Effect.uninterruptibleMask((restore) =>
301-
Effect.tap(
302-
restore(Effect.forkDaemon(effect)),
303-
(fiber) => add(self, fiber, options)
304-
)
305-
)
306-
})
294+
const options = arguments[1]
295+
return (effect: Effect.Effect<any, any, any>) => runImpl(self, effect, options)
307296
}
308-
const effect = arguments[1]
309-
const options = arguments[2] as { readonly propagateInterruption?: boolean | undefined } | undefined
310-
return Effect.suspend(() => {
297+
return runImpl(self, arguments[1], arguments[2]) as any
298+
}
299+
300+
const runImpl = <A, E, R, XE extends E, XA extends A>(
301+
self: FiberSet<A, E>,
302+
effect: Effect.Effect<XA, XE, R>,
303+
options?: {
304+
readonly propagateInterruption?: boolean | undefined
305+
}
306+
): Effect.Effect<Fiber.RuntimeFiber<XA, XE>, never, R> =>
307+
Effect.fiberIdWith((fiberId) => {
311308
if (self.state._tag === "Closed") {
312309
return Effect.interrupt
313310
}
314-
return Effect.uninterruptibleMask((restore) =>
315-
Effect.tap(
316-
restore(Effect.forkDaemon(effect)),
317-
(fiber) => add(self, fiber, options)
318-
)
311+
return Effect.tap(
312+
Effect.forkDaemon(effect),
313+
(fiber) =>
314+
unsafeAdd(self, fiber, {
315+
...options,
316+
interruptAs: fiberId
317+
})
319318
)
320-
}) as any
321-
}
319+
})
322320

323321
/**
324322
* Capture a Runtime and use it to fork Effect's, adding the forked fibers to the FiberSet.
@@ -405,3 +403,16 @@ export const size = <A, E>(self: FiberSet<A, E>): Effect.Effect<number> =>
405403
*/
406404
export const join = <A, E>(self: FiberSet<A, E>): Effect.Effect<void, E> =>
407405
Deferred.await(self.deferred as Deferred.Deferred<void, E>)
406+
407+
/**
408+
* Wait until the fiber set is empty.
409+
*
410+
* @since 3.13.0
411+
* @categories combinators
412+
*/
413+
export const awaitEmpty = <A, E>(self: FiberSet<A, E>): Effect.Effect<void> =>
414+
Effect.whileLoop({
415+
while: () => self.state._tag === "Open" && self.state.backing.size > 0,
416+
body: () => Fiber.await(Iterable.unsafeHead(self)),
417+
step: constVoid
418+
})

packages/effect/test/FiberHandle.test.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { describe, it } from "@effect/vitest"
2-
import { Deferred, Effect, Exit, Fiber, FiberHandle, pipe, Ref } from "effect"
1+
import { assert, describe, it } from "@effect/vitest"
2+
import { Deferred, Effect, Exit, Fiber, FiberHandle, pipe, Ref, TestClock } from "effect"
33
import { assertFalse, assertTrue, strictEqual } from "effect/test/util"
44

55
describe("FiberHandle", () => {
@@ -99,4 +99,16 @@ describe("FiberHandle", () => {
9999
)
100100
))
101101
}))
102+
103+
it.scoped("awaitEmpty", () =>
104+
Effect.gen(function*() {
105+
const handle = yield* FiberHandle.make()
106+
yield* FiberHandle.run(handle, Effect.sleep(1000))
107+
108+
const fiber = yield* Effect.fork(FiberHandle.awaitEmpty(handle))
109+
yield* TestClock.adjust(500)
110+
assert.isNull(fiber.unsafePoll())
111+
yield* TestClock.adjust(500)
112+
assert.isDefined(fiber.unsafePoll())
113+
}))
102114
})

packages/effect/test/FiberMap.test.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { describe, it } from "@effect/vitest"
2-
import { Array, Deferred, Effect, Exit, Fiber, FiberMap, pipe, Ref, Scope } from "effect"
1+
import { assert, describe, it } from "@effect/vitest"
2+
import { Array, Deferred, Effect, Exit, Fiber, FiberMap, pipe, Ref, Scope, TestClock } from "effect"
33
import { assertFalse, assertTrue, strictEqual } from "effect/test/util"
44

55
describe("FiberMap", () => {
@@ -121,4 +121,19 @@ describe("FiberMap", () => {
121121
)
122122
))
123123
}))
124+
125+
it.scoped("awaitEmpty", () =>
126+
Effect.gen(function*() {
127+
const map = yield* FiberMap.make<string>()
128+
yield* FiberMap.run(map, "a", Effect.sleep(1000))
129+
yield* FiberMap.run(map, "b", Effect.sleep(1000))
130+
yield* FiberMap.run(map, "c", Effect.sleep(1000))
131+
yield* FiberMap.run(map, "d", Effect.sleep(1000))
132+
133+
const fiber = yield* Effect.fork(FiberMap.awaitEmpty(map))
134+
yield* TestClock.adjust(500)
135+
assert.isNull(fiber.unsafePoll())
136+
yield* TestClock.adjust(500)
137+
assert.isDefined(fiber.unsafePoll())
138+
}))
124139
})

packages/effect/test/FiberSet.test.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { describe, it } from "@effect/vitest"
2-
import { Array, Deferred, Effect, Exit, Fiber, FiberSet, pipe, Ref, Scope } from "effect"
1+
import { assert, describe, it } from "@effect/vitest"
2+
import { Array, Deferred, Effect, Exit, Fiber, FiberSet, pipe, Ref, Scope, TestClock } from "effect"
33
import { assertFalse, assertTrue, strictEqual } from "effect/test/util"
44

55
describe("FiberSet", () => {
@@ -93,4 +93,19 @@ describe("FiberSet", () => {
9393
)
9494
))
9595
}))
96+
97+
it.scoped("awaitEmpty", () =>
98+
Effect.gen(function*() {
99+
const set = yield* FiberSet.make()
100+
yield* FiberSet.run(set, Effect.sleep(1000))
101+
yield* FiberSet.run(set, Effect.sleep(1000))
102+
yield* FiberSet.run(set, Effect.sleep(1000))
103+
yield* FiberSet.run(set, Effect.sleep(1000))
104+
105+
const fiber = yield* Effect.fork(FiberSet.awaitEmpty(set))
106+
yield* TestClock.adjust(500)
107+
assert.isNull(fiber.unsafePoll())
108+
yield* TestClock.adjust(500)
109+
assert.isDefined(fiber.unsafePoll())
110+
}))
96111
})

0 commit comments

Comments
 (0)