-
Notifications
You must be signed in to change notification settings - Fork 74
Distributed singleton should be more careful with handover #1127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -86,7 +86,7 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS | |
self.singletonFactory = singletonFactory | ||
self.buffer = RemoteCallBuffer(capacity: settings.bufferCapacity) | ||
|
||
self.wellKnownName = "$singletonBoss-\(settings.name)" | ||
self.wellKnownName = Self.makeSingletonBossWellKnownName(settings) | ||
|
||
if system.settings.enabled { | ||
self.clusterEventsSubscribeTask = Task { | ||
|
@@ -109,6 +109,10 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS | |
self.clusterEventsSubscribeTask?.cancel() | ||
} | ||
|
||
private static func makeSingletonBossWellKnownName(_ settings: ClusterSingletonSettings) -> String { | ||
"$singletonBoss-\(settings.name)" | ||
} | ||
|
||
private func receiveClusterEvent(_ event: Cluster.Event) async throws { | ||
// Feed the event to `AllocationStrategy` then forward the result to `updateTargetNode`, | ||
// which will determine if `targetNode` has changed and react accordingly. | ||
|
@@ -130,11 +134,13 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS | |
try await self.takeOver(from: previousTargetNode) | ||
default: | ||
if previousTargetNode == selfNode { | ||
self.handOver(to: node) | ||
await self.handOver(to: node) | ||
} | ||
|
||
// TODO: await here for the handover? | ||
|
||
// Update `singleton` regardless | ||
try self.updateSingleton(node: node) | ||
try await self.updateSingleton(node: node) | ||
} | ||
} | ||
|
||
|
@@ -155,50 +161,87 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS | |
self.updateSingleton(singleton) | ||
} | ||
|
||
internal func handOver(to: Cluster.Node?) { | ||
self.log.debug("Hand over singleton [\(self.settings.name)] to [\(String(describing: to))]", metadata: self.metadata()) | ||
|
||
internal func handOver(to: Cluster.Node?) async { | ||
guard let instance = self.targetSingleton else { | ||
return // we're done, we never allocated it at all | ||
} | ||
|
||
self.log.debug("Hand over singleton [\(self.settings.name)] to [\(String(describing: to))]", metadata: self.metadata()) | ||
|
||
Task { | ||
// we ask the singleton to passivate, it may want to flush some writes or similar. | ||
// TODO: potentially do some timeout on this? | ||
if __isLocalActor(instance) { | ||
self.log.debug("Passivating singleton \(instance.id)...") | ||
} | ||
await instance.whenLocal { __secretlyKnownToBeLocal in | ||
// TODO: should have some timeout | ||
await __secretlyKnownToBeLocal.passivateSingleton() | ||
await instance.whenLocal { [log] in | ||
log.debug("Passivating singleton \(instance.id)...") | ||
// TODO: potentially do some timeout on this? | ||
await $0.passivateSingleton() | ||
} | ||
|
||
// TODO: (optimization) tell `to` node that this node is handing off (https://github.com/apple/swift-distributed-actors/issues/329) | ||
// Finally, release the singleton -- it should not have been refered to strongly by anyone else, | ||
// causing the instance to be released. TODO: we could assert that we have released it soon after here (it's ID must be resigned). | ||
self.actorSystem.releaseWellKnownActorID(instance.id) | ||
self.updateSingleton(nil) | ||
await self.updateSingleton(nil) | ||
} | ||
} | ||
|
||
private func updateSingleton(node: Cluster.Node?) throws { | ||
/// Update on which ``Cluster/Node`` this boss considers the singleton to be hosted. | ||
func updateSingleton(node: Cluster.Node?) async throws { | ||
switch node { | ||
case .some(let node) where node == self.actorSystem.cluster.node: | ||
// This must have been a result of an activate() and the singleton must be stored locally | ||
precondition(self.targetSingleton?.id.node == self.actorSystem.cluster.node) | ||
return | ||
|
||
case .some(let otherNode): | ||
var targetSingletonID = ActorID(remote: otherNode, type: Act.self, incarnation: .wellKnown) | ||
targetSingletonID.metadata.wellKnown = self.settings.name // FIXME: rather, use the BOSS as the target | ||
targetSingletonID.path = self.id.path | ||
var targetSingletonBossID = ActorID(remote: otherNode, type: Self.self, incarnation: .wellKnown) | ||
// targetSingletonID.metadata.wellKnown = self.settings.name // FIXME: rather, use the BOSS as the target | ||
targetSingletonBossID.metadata.wellKnown = Self.makeSingletonBossWellKnownName(self.settings) | ||
targetSingletonBossID.path = self.id.path | ||
let targetSingletonBoss = try Self.resolve(id: targetSingletonBossID, using: self.actorSystem) | ||
|
||
let targetSingleton: Act = try await Backoff.exponential( | ||
initialInterval: settings.locateActiveSingletonBackoff.initialInterval, | ||
multiplier: settings.locateActiveSingletonBackoff.multiplier, | ||
capInterval: settings.locateActiveSingletonBackoff.capInterval, | ||
randomFactor: settings.locateActiveSingletonBackoff.randomFactor, | ||
maxAttempts: settings.locateActiveSingletonBackoff.maxAttempts | ||
).attempt { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
// confirm tha the boss is hosting the singleton, if not we may have to wait and try again | ||
do { | ||
guard ((try? await targetSingletonBoss.hasActiveSingleton()) ?? false) else { | ||
throw SingletonNotFoundNoExpectedNode(id: self.settings.name, node) | ||
} | ||
} catch { | ||
throw error | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This do/catch is just here so we can add debug logging, will do shortly |
||
|
||
var targetSingletonID = ActorID(remote: otherNode, type: Self.self, incarnation: .wellKnown) | ||
targetSingletonID.metadata.wellKnown = self.settings.name | ||
targetSingletonID.path = self.id.path | ||
|
||
return try Act.resolve(id: targetSingletonID, using: self.actorSystem) | ||
} | ||
self.updateSingleton(targetSingleton) | ||
|
||
let singleton = try Act.resolve(id: targetSingletonID, using: self.actorSystem) | ||
self.updateSingleton(singleton) | ||
case .none: | ||
self.updateSingleton(nil) | ||
} | ||
} | ||
|
||
// FIXME: would like to return `Act?` but can't via the generic combining with Codable: rdar://111664985 & https://github.com/apple/swift/issues/67090 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Solution incoming here: swiftlang/swift#67117 |
||
distributed func hasActiveSingleton() -> Bool { | ||
guard let targetSingleton = self.targetSingleton else { | ||
self.log.debug("Was checked for active singleton. Not active.") | ||
return false | ||
} | ||
guard targetSingleton.id.node == self.selfNode else { | ||
self.log.debug("Was checked for active singleton. Active on different node.") | ||
return false | ||
} | ||
self.log.debug("Was checked for active singleton. Active on this node.") | ||
return true | ||
} | ||
|
||
private func updateSingleton(_ newSingleton: Act?) { | ||
self.log.debug("Update singleton from [\(String(describing: self.targetSingleton))] to [\(newSingleton?.id.description ?? "nil")], with \(self.buffer.count) remote calls pending", metadata: self.metadata()) | ||
self.targetSingleton = newSingleton | ||
|
@@ -257,7 +300,7 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS | |
} | ||
} | ||
|
||
log.info("FOUND SINGLETON: \(found) local:\(__isLocalActor(found))", metadata: self.metadata()) | ||
log.info("Found singleton: \(found) local:\(__isLocalActor(found))", metadata: self.metadata()) | ||
return found | ||
} | ||
|
||
|
@@ -267,8 +310,8 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS | |
try await singletonFactory(self.actorSystem) | ||
} | ||
|
||
await singleton.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❤️ |
||
await __secretlyKnownToBeLocal.activateSingleton() | ||
await singleton.whenLocal { | ||
await $0.activateSingleton() | ||
} | ||
|
||
self.log.trace("Activated singleton instance: \(singleton.id.fullDescription)", metadata: self.metadata()) | ||
|
@@ -278,9 +321,9 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS | |
} | ||
|
||
nonisolated func stop() async { | ||
await self.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead | ||
await self.whenLocal { | ||
// TODO: perhaps we can figure out where `to` is next and hand over gracefully? | ||
__secretlyKnownToBeLocal.handOver(to: nil) | ||
await $0.handOver(to: nil) | ||
} | ||
} | ||
|
||
|
@@ -407,39 +450,50 @@ extension ClusterSingletonBoss { | |
where Err: Error, | ||
Res: Codable | ||
{ | ||
let singleton = try await self.findSingleton(target: target) | ||
self.log.trace( | ||
"Forwarding call to \(target)", | ||
metadata: self.metadata([ | ||
"remoteCall/target": "\(target)", | ||
"remoteCall/invocation": "\(invocation)", | ||
]) | ||
) | ||
|
||
var invocation = invocation // can't be inout param | ||
if targetNode == selfNode, | ||
let singleton = self.targetSingleton | ||
{ | ||
assert( | ||
singleton.id.node == selfNode, | ||
"Target singleton node and targetNode were not the same! TargetNode: \(targetNode)," + | ||
" singleton.id.node: \(singleton.id.node)" | ||
do { | ||
let singleton = try await self.findSingleton(target: target) | ||
self.log.trace( | ||
"Forwarding call to \(target)", | ||
metadata: self.metadata([ | ||
"remoteCall/target": "\(target)", | ||
"remoteCall/invocation": "\(invocation)", | ||
]) | ||
) | ||
return try await singleton.actorSystem.localCall( | ||
|
||
var invocation = invocation // can't be inout param | ||
if targetNode == selfNode, | ||
let singleton = self.targetSingleton | ||
{ | ||
assert( | ||
singleton.id.node == selfNode, | ||
"Target singleton node and targetNode were not the same! TargetNode: \(targetNode)," + | ||
" singleton.id.node: \(singleton.id.node)" | ||
) | ||
return try await singleton.actorSystem.localCall( | ||
on: singleton, | ||
target: target, invocation: &invocation, | ||
throwing: throwing, | ||
returning: returning | ||
) | ||
} | ||
|
||
return try await singleton.actorSystem.remoteCall( | ||
on: singleton, | ||
target: target, invocation: &invocation, | ||
target: target, | ||
invocation: &invocation, | ||
throwing: throwing, | ||
returning: returning | ||
) | ||
} catch { | ||
log.warning( | ||
"Failed forwarding call to \(target)", | ||
metadata: [ | ||
"remoteCall/target": "\(target)", | ||
"remoteCall/invocation": "\(invocation)", | ||
] | ||
) | ||
throw error // FIXME: if dead letter then keep stashed? | ||
} | ||
|
||
return try await singleton.actorSystem.remoteCall( | ||
on: singleton, | ||
target: target, | ||
invocation: &invocation, | ||
throwing: throwing, | ||
returning: returning | ||
) | ||
} | ||
|
||
/// Handles the incoming message by either stashing or forwarding to the singleton. | ||
|
@@ -536,3 +590,13 @@ struct ClusterSingletonRemoteCallInterceptor<Singleton: ClusterSingleton>: Remot | |
} | ||
} | ||
} | ||
|
||
public struct SingletonNotFoundNoExpectedNode: Error { | ||
let id: String | ||
let node: Cluster.Node? | ||
|
||
init(id: String, _ node: Cluster.Node?) { | ||
self.id = id | ||
self.node = node | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not intended to keep here I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!