Skip to content

Commit f67c2fa

Browse files
committed
ensure persisted messages are sent when no assignments (#4618)
1 parent b40af94 commit f67c2fa

File tree

2 files changed

+15
-12
lines changed

2 files changed

+15
-12
lines changed

packages/cluster/src/Runners.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import * as Effect from "effect/Effect"
1111
import * as Exit from "effect/Exit"
1212
import * as FiberRef from "effect/FiberRef"
1313
import * as Layer from "effect/Layer"
14+
import * as Option from "effect/Option"
1415
import * as RcMap from "effect/RcMap"
1516
import * as Schema from "effect/Schema"
1617
import type { Scope } from "effect/Scope"
@@ -74,7 +75,7 @@ export class Runners extends Context.Tag("@effect/cluster/Runners")<Runners, {
7475
*/
7576
readonly notify: <R extends Rpc.Any>(
7677
options: {
77-
readonly address: RunnerAddress
78+
readonly address: Option.Option<RunnerAddress>
7879
readonly message: Message.Outgoing<R>
7980
readonly discard: boolean
8081
}
@@ -497,8 +498,11 @@ export const makeRpc: Effect.Effect<
497498
})
498499
},
499500
notify({ address, message }) {
501+
if (Option.isNone(address)) {
502+
return Effect.void
503+
}
500504
const envelope = message.envelope
501-
return RcMap.get(clients, address).pipe(
505+
return RcMap.get(clients, address.value).pipe(
502506
Effect.flatMap((client) => client.Notify({ envelope })),
503507
Effect.scoped,
504508
Effect.ignore

packages/cluster/src/Sharding.ts

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -668,19 +668,18 @@ export const make = Effect.gen(function*() {
668668
Effect.suspend(() => {
669669
const address = message.envelope.address
670670
const maybeRunner = MutableHashMap.get(shardAssignments, address.shardId)
671-
if (Option.isNone(maybeRunner)) {
672-
return Effect.fail(new EntityNotManagedByRunner({ address }))
673-
}
674-
const runner = maybeRunner.value
675-
const rpc = message.rpc as any as Rpc.AnyWithProps
676-
if (storageEnabled && Context.get(rpc.annotations, Persisted)) {
677-
return isLocalRunner(runner)
671+
const isPersisted = storageEnabled && Context.get(message.rpc.annotations, Persisted)
672+
const runnerIsLocal = Option.isSome(maybeRunner) && isLocalRunner(maybeRunner.value)
673+
if (isPersisted) {
674+
return runnerIsLocal
678675
? notifyLocal(message, discard)
679-
: runners.notify({ address: runner, message, discard })
676+
: runners.notify({ address: maybeRunner, message, discard })
677+
} else if (Option.isNone(maybeRunner)) {
678+
return Effect.fail(new EntityNotManagedByRunner({ address }))
680679
}
681-
return isLocalRunner(runner)
680+
return runnerIsLocal
682681
? sendLocal(message)
683-
: runners.send({ address: runner, message })
682+
: runners.send({ address: maybeRunner.value, message })
684683
}),
685684
isTransientError,
686685
(error) => {

0 commit comments

Comments
 (0)