Skip to content

Distributed singleton should be more careful with handover #1127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions Sources/DistributedCluster/Backoff.swift
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ public struct ConstantBackoffStrategy: BackoffStrategy {
///
/// - SeeAlso: Also used to configure `_SupervisionStrategy`.
public struct ExponentialBackoffStrategy: BackoffStrategy {
// TODO: clock + limit "max total wait time" etc

/// Default values for the backoff parameters.
public enum Defaults {
public static let initialInterval: Duration = .milliseconds(200)
Expand Down Expand Up @@ -208,6 +206,38 @@ public struct ExponentialBackoffStrategy: BackoffStrategy {
}
}

/// Attempt to execute the passed `operation` at most `maxAttempts` times while applying the expected backoff strategy.
///
/// - Parameters:
/// - operation: The operation to run, potentially multiple times until successful or maxAttempts were made
/// - Throws: the last error thrown by `operation`
public func attempt<Value>(_ operation: () async throws -> Value) async throws -> Value {
var backoff = self
var lastError: Error?
defer {
pprint("RETURNING NOW singleton")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not intended to keep here I think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

}

do {
return try await operation()
} catch {
lastError = error

while let backoffDuration = backoff.next() {
try await Task.sleep(for: backoffDuration)
do {
return try await operation()
} catch {
lastError = error
// and try again, if remaining tries are left...
}
}
}

// If we ended up here, there must have been an error thrown and stored, re-throw it
throw lastError!
}

private mutating func prepareNextInterval() {
// overflow protection
if self.currentBaseInterval >= (self.capInterval / self.multiplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import Logging
///
/// To host a distributed cluster singleton, use the ``ClusterSingletonPlugin/host(_:name:settings:makeInstance:)`` method.
///
public protocol ClusterSingleton: DistributedActor where ActorSystem == ClusterSystem {
public protocol ClusterSingleton: Codable, DistributedActor where ActorSystem == ClusterSystem {
/// The singleton is now active, and should perform its duties.
///
/// Invoked by the cluster singleton boss when after it has created this instance of the singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS
self.singletonFactory = singletonFactory
self.buffer = RemoteCallBuffer(capacity: settings.bufferCapacity)

self.wellKnownName = "$singletonBoss-\(settings.name)"
self.wellKnownName = Self.makeSingletonBossWellKnownName(settings)

if system.settings.enabled {
self.clusterEventsSubscribeTask = Task {
Expand All @@ -109,6 +109,10 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS
self.clusterEventsSubscribeTask?.cancel()
}

private static func makeSingletonBossWellKnownName(_ settings: ClusterSingletonSettings) -> String {
"$singletonBoss-\(settings.name)"
}

private func receiveClusterEvent(_ event: Cluster.Event) async throws {
// Feed the event to `AllocationStrategy` then forward the result to `updateTargetNode`,
// which will determine if `targetNode` has changed and react accordingly.
Expand All @@ -130,11 +134,13 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS
try await self.takeOver(from: previousTargetNode)
default:
if previousTargetNode == selfNode {
self.handOver(to: node)
await self.handOver(to: node)
}

// TODO: await here for the handover?

// Update `singleton` regardless
try self.updateSingleton(node: node)
try await self.updateSingleton(node: node)
}
}

Expand All @@ -155,50 +161,87 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS
self.updateSingleton(singleton)
}

internal func handOver(to: Cluster.Node?) {
self.log.debug("Hand over singleton [\(self.settings.name)] to [\(String(describing: to))]", metadata: self.metadata())

internal func handOver(to: Cluster.Node?) async {
guard let instance = self.targetSingleton else {
return // we're done, we never allocated it at all
}

self.log.debug("Hand over singleton [\(self.settings.name)] to [\(String(describing: to))]", metadata: self.metadata())

Task {
// we ask the singleton to passivate, it may want to flush some writes or similar.
// TODO: potentially do some timeout on this?
if __isLocalActor(instance) {
self.log.debug("Passivating singleton \(instance.id)...")
}
await instance.whenLocal { __secretlyKnownToBeLocal in
// TODO: should have some timeout
await __secretlyKnownToBeLocal.passivateSingleton()
await instance.whenLocal { [log] in
log.debug("Passivating singleton \(instance.id)...")
// TODO: potentially do some timeout on this?
await $0.passivateSingleton()
}

// TODO: (optimization) tell `to` node that this node is handing off (https://github.com/apple/swift-distributed-actors/issues/329)
// Finally, release the singleton -- it should not have been refered to strongly by anyone else,
// causing the instance to be released. TODO: we could assert that we have released it soon after here (it's ID must be resigned).
self.actorSystem.releaseWellKnownActorID(instance.id)
self.updateSingleton(nil)
await self.updateSingleton(nil)
}
}

private func updateSingleton(node: Cluster.Node?) throws {
/// Update on which ``Cluster/Node`` this boss considers the singleton to be hosted.
func updateSingleton(node: Cluster.Node?) async throws {
switch node {
case .some(let node) where node == self.actorSystem.cluster.node:
// This must have been a result of an activate() and the singleton must be stored locally
precondition(self.targetSingleton?.id.node == self.actorSystem.cluster.node)
return

case .some(let otherNode):
var targetSingletonID = ActorID(remote: otherNode, type: Act.self, incarnation: .wellKnown)
targetSingletonID.metadata.wellKnown = self.settings.name // FIXME: rather, use the BOSS as the target
targetSingletonID.path = self.id.path
var targetSingletonBossID = ActorID(remote: otherNode, type: Self.self, incarnation: .wellKnown)
// targetSingletonID.metadata.wellKnown = self.settings.name // FIXME: rather, use the BOSS as the target
targetSingletonBossID.metadata.wellKnown = Self.makeSingletonBossWellKnownName(self.settings)
targetSingletonBossID.path = self.id.path
let targetSingletonBoss = try Self.resolve(id: targetSingletonBossID, using: self.actorSystem)

let targetSingleton: Act = try await Backoff.exponential(
initialInterval: settings.locateActiveSingletonBackoff.initialInterval,
multiplier: settings.locateActiveSingletonBackoff.multiplier,
capInterval: settings.locateActiveSingletonBackoff.capInterval,
randomFactor: settings.locateActiveSingletonBackoff.randomFactor,
maxAttempts: settings.locateActiveSingletonBackoff.maxAttempts
).attempt {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backoff.exponential().attemtpt {} is very nice, we're finally beginning to reap benefits of async/await inside the cluster :)

// confirm tha the boss is hosting the singleton, if not we may have to wait and try again
do {
guard ((try? await targetSingletonBoss.hasActiveSingleton()) ?? false) else {
throw SingletonNotFoundNoExpectedNode(id: self.settings.name, node)
}
} catch {
throw error
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This do/catch is just here so we can add debug logging, will do shortly


var targetSingletonID = ActorID(remote: otherNode, type: Self.self, incarnation: .wellKnown)
targetSingletonID.metadata.wellKnown = self.settings.name
targetSingletonID.path = self.id.path

return try Act.resolve(id: targetSingletonID, using: self.actorSystem)
}
self.updateSingleton(targetSingleton)

let singleton = try Act.resolve(id: targetSingletonID, using: self.actorSystem)
self.updateSingleton(singleton)
case .none:
self.updateSingleton(nil)
}
}

// FIXME: would like to return `Act?` but can't via the generic combining with Codable: rdar://111664985 & https://github.com/apple/swift/issues/67090
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solution incoming here: swiftlang/swift#67117

distributed func hasActiveSingleton() -> Bool {
guard let targetSingleton = self.targetSingleton else {
self.log.debug("Was checked for active singleton. Not active.")
return false
}
guard targetSingleton.id.node == self.selfNode else {
self.log.debug("Was checked for active singleton. Active on different node.")
return false
}
self.log.debug("Was checked for active singleton. Active on this node.")
return true
}

private func updateSingleton(_ newSingleton: Act?) {
self.log.debug("Update singleton from [\(String(describing: self.targetSingleton))] to [\(newSingleton?.id.description ?? "nil")], with \(self.buffer.count) remote calls pending", metadata: self.metadata())
self.targetSingleton = newSingleton
Expand Down Expand Up @@ -257,7 +300,7 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS
}
}

log.info("FOUND SINGLETON: \(found) local:\(__isLocalActor(found))", metadata: self.metadata())
log.info("Found singleton: \(found) local:\(__isLocalActor(found))", metadata: self.metadata())
return found
}

Expand All @@ -267,8 +310,8 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS
try await singletonFactory(self.actorSystem)
}

await singleton.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

await __secretlyKnownToBeLocal.activateSingleton()
await singleton.whenLocal {
await $0.activateSingleton()
}

self.log.trace("Activated singleton instance: \(singleton.id.fullDescription)", metadata: self.metadata())
Expand All @@ -278,9 +321,9 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS
}

nonisolated func stop() async {
await self.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
await self.whenLocal {
// TODO: perhaps we can figure out where `to` is next and hand over gracefully?
__secretlyKnownToBeLocal.handOver(to: nil)
await $0.handOver(to: nil)
}
}

Expand Down Expand Up @@ -407,39 +450,50 @@ extension ClusterSingletonBoss {
where Err: Error,
Res: Codable
{
let singleton = try await self.findSingleton(target: target)
self.log.trace(
"Forwarding call to \(target)",
metadata: self.metadata([
"remoteCall/target": "\(target)",
"remoteCall/invocation": "\(invocation)",
])
)

var invocation = invocation // can't be inout param
if targetNode == selfNode,
let singleton = self.targetSingleton
{
assert(
singleton.id.node == selfNode,
"Target singleton node and targetNode were not the same! TargetNode: \(targetNode)," +
" singleton.id.node: \(singleton.id.node)"
do {
let singleton = try await self.findSingleton(target: target)
self.log.trace(
"Forwarding call to \(target)",
metadata: self.metadata([
"remoteCall/target": "\(target)",
"remoteCall/invocation": "\(invocation)",
])
)
return try await singleton.actorSystem.localCall(

var invocation = invocation // can't be inout param
if targetNode == selfNode,
let singleton = self.targetSingleton
{
assert(
singleton.id.node == selfNode,
"Target singleton node and targetNode were not the same! TargetNode: \(targetNode)," +
" singleton.id.node: \(singleton.id.node)"
)
return try await singleton.actorSystem.localCall(
on: singleton,
target: target, invocation: &invocation,
throwing: throwing,
returning: returning
)
}

return try await singleton.actorSystem.remoteCall(
on: singleton,
target: target, invocation: &invocation,
target: target,
invocation: &invocation,
throwing: throwing,
returning: returning
)
} catch {
log.warning(
"Failed forwarding call to \(target)",
metadata: [
"remoteCall/target": "\(target)",
"remoteCall/invocation": "\(invocation)",
]
)
throw error // FIXME: if dead letter then keep stashed?
}

return try await singleton.actorSystem.remoteCall(
on: singleton,
target: target,
invocation: &invocation,
throwing: throwing,
returning: returning
)
}

/// Handles the incoming message by either stashing or forwarding to the singleton.
Expand Down Expand Up @@ -536,3 +590,13 @@ struct ClusterSingletonRemoteCallInterceptor<Singleton: ClusterSingleton>: Remot
}
}
}

public struct SingletonNotFoundNoExpectedNode: Error {
let id: String
let node: Cluster.Node?

init(id: String, _ node: Cluster.Node?) {
self.id = id
self.node = node
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ public struct ClusterSingletonSettings {
/// we stop stashing calls and throw error.
public var allocationTimeout: Duration = .seconds(30)

// Backoff configuration when trying to obtain an active singleton reference on a new node.
public var locateActiveSingletonBackoff: LocateActiveSingletonBackoffSettings = .init()
public struct LocateActiveSingletonBackoffSettings {
var initialInterval: Duration = .milliseconds(300)
var multiplier: Double = ExponentialBackoffStrategy.Defaults.multiplier
var capInterval: Duration = ExponentialBackoffStrategy.Defaults.capInterval
var randomFactor: Double = ExponentialBackoffStrategy.Defaults.randomFactor
var maxAttempts: Int = 5
}

public init() {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import AsyncAlgorithms
import Distributed
import DistributedActorsTestKit
@testable import DistributedCluster
import Logging
import XCTest

final class ClusterSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCase {
override func configureLogCapture(settings: inout LogCapture.Settings) {
settings.excludeActorPaths = [
"/system/swim",
"/system/cluster/swim",
"/system/cluster",
"/system/cluster/gossip",
Expand Down Expand Up @@ -96,9 +98,7 @@ final class ClusterSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCas

// pretend we're handing over to somewhere else:
let boss = await first.singleton._boss(name: name, type: LifecycleTestSingleton.self)!
await boss.whenLocal { __secretlyKnownToBeLocal in
__secretlyKnownToBeLocal.handOver(to: nil)
}
await boss.whenLocal { await $0.handOver(to: nil) }

try probe.expectMessage(prefix: "passivate")
try probe.expectMessage(prefix: "deinit")
Expand Down Expand Up @@ -261,8 +261,15 @@ final class ClusterSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCas
attempt += 1
let message = "\(greetingName) (\(attempt))"
group.addTask {
pnote(" Sending: \(message) -> \(singleton) (it may be terminated/not-re-pointed yet)")
return try await singleton.greet(name: message)
pnote(" Sending: '\(message)' -> [\(singleton)] (it may be terminated/not-re-pointed yet)")
do {
let value = try await singleton.greet(name: message)
pinfo(" Passed '\(message)' -> [\(singleton)]: reply: \(value)")
return value
} catch {
pinfo(" Failed '\(message)' -> [\(singleton)]: error: \(error)")
throw error
}
}
}

Expand Down