Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions Sources/Valkey/Cluster/ValkeyClusterClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ public final class ValkeyClusterClient: Sendable {
@inlinable
public func execute<Command: ValkeyCommand>(_ command: Command) async throws -> Command.Response {
let hashSlot = try self.hashSlot(for: command.keysAffected)
let nodeSelection = getNodeSelection(readOnly: command.isReadOnly)
var clientSelector: () async throws -> ValkeyNodeClient = {
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: nodeSelection)
}

var asking = false
Expand Down Expand Up @@ -252,9 +253,11 @@ public final class ValkeyClusterClient: Sendable {
_ commands: [any ValkeyCommand]
) async -> [Result<RESPToken, any Error>] {
guard commands.count > 0 else { return [] }
let readOnlyCommand = commands.reduce(true) { $0 && $1.isReadOnly }
let nodeSelection = getNodeSelection(readOnly: readOnlyCommand)
// get a list of nodes and the commands that should be run on them
do {
let nodes = try await self.splitCommandsAcrossNodes(commands: commands)
let nodes = try await self.splitCommandsAcrossNodes(commands: commands, nodeSelection: nodeSelection)
// if this list has one element, then just run the pipeline on that single node
if nodes.count == 1 {
do {
Expand Down Expand Up @@ -340,9 +343,10 @@ public final class ValkeyClusterClient: Sendable {
_ commands: Commands
) async throws -> [Result<RESPToken, Error>] where Commands.Element == any ValkeyCommand {
let hashSlot = try self.hashSlot(for: commands.flatMap { $0.keysAffected })

let readOnlyCommand = commands.reduce(true) { $0 && $1.isReadOnly }
let nodeSelection = getNodeSelection(readOnly: readOnlyCommand)
var clientSelector: () async throws -> ValkeyNodeClient = {
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: nodeSelection)
}

var asking = false
Expand Down Expand Up @@ -458,20 +462,32 @@ public final class ValkeyClusterClient: Sendable {
///
/// - Parameters:
/// - keys: Keys affected by operation. This is used to choose the cluster node
/// - readOnly: Is this connection only going to be used with readonly commands
/// - isolation: Actor isolation
/// - operation: Closure handling Valkey connection
/// - Returns: Value returned by closure
@inlinable
public func withConnection<Value>(
forKeys keys: some Collection<ValkeyKey>,
readOnly: Bool = false,
isolation: isolated (any Actor)? = #isolation,
operation: (ValkeyConnection) async throws -> sending Value
) async throws -> Value {
let hashSlots = keys.compactMap { HashSlot(key: $0) }
let node = try await self.nodeClient(for: hashSlots)
let nodeSelection = getNodeSelection(readOnly: readOnly)
let node = try await self.nodeClient(for: hashSlots, nodeSelection: nodeSelection)
return try await node.withConnection(isolation: isolation, operation: operation)
}

@inlinable
/* private */ func getNodeSelection(readOnly: Bool) -> ValkeyClusterNodeSelection {
if readOnly {
self.clientConfiguration.readOnlyCommandNodeSelection.clusterNodeSelection
} else {
.primary
}
}

/// Starts running the cluster client.
///
/// This method initiates:
Expand Down Expand Up @@ -557,7 +573,10 @@ public final class ValkeyClusterClient: Sendable {
/// These array of indices are then used to create collections of commands to
/// run on each node
@usableFromInline
func splitCommandsAcrossNodes(commands: [any ValkeyCommand]) async throws -> [ValkeyServerAddress: NodeAndCommands].Values {
func splitCommandsAcrossNodes(
commands: [any ValkeyCommand],
nodeSelection: ValkeyClusterNodeSelection
) async throws -> [ValkeyServerAddress: NodeAndCommands].Values {
var nodeMap: [ValkeyServerAddress: NodeAndCommands] = [:]
var index = commands.startIndex
var prevAddress: ValkeyServerAddress? = nil
Expand All @@ -570,7 +589,7 @@ public final class ValkeyClusterClient: Sendable {
// Get hash slot for key and add all the commands you have iterated through so far to the
// node associated with that key and break out of loop
let hashSlot = try self.hashSlot(for: keysAffected)
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: nodeSelection)
let address = node.serverAddress
let nodeAndCommands = NodeAndCommands(node: node, commandIndices: .init(commands.startIndex..<index))
nodeMap[address] = nodeAndCommands
Expand All @@ -586,7 +605,7 @@ public final class ValkeyClusterClient: Sendable {
if keysAffected.count > 0 {
// If command affects a key get hash slot for key and add command to the node associated with that key
let hashSlot = try self.hashSlot(for: keysAffected)
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: nodeSelection)
prevAddress = node.serverAddress
nodeMap[prevAddress, default: .init(node: node, commandIndices: [])].commandIndices.append(index)
} else {
Expand All @@ -597,7 +616,7 @@ public final class ValkeyClusterClient: Sendable {
}
} else {
// if none of the commands affect any keys then choose a random node
let node = try await self.nodeClient(for: [])
let node = try await self.nodeClient(for: [], nodeSelection: nodeSelection)
let address = node.serverAddress
let nodeAndCommands = NodeAndCommands(node: node, commandIndices: .init(commands.startIndex..<index))
nodeMap[address] = nodeAndCommands
Expand Down Expand Up @@ -854,14 +873,17 @@ public final class ValkeyClusterClient: Sendable {
/// - `ValkeyClusterError.clusterIsUnavailable` if no healthy nodes are available
/// - `ValkeyClusterError.clusterIsMissingSlotAssignment` if the slot assignment cannot be determined
@inlinable
package func nodeClient(for slots: some (Collection<HashSlot> & Sendable)) async throws -> ValkeyNodeClient {
package func nodeClient(
for slots: some (Collection<HashSlot> & Sendable),
nodeSelection: ValkeyClusterNodeSelection
) async throws -> ValkeyNodeClient {
var retries = 0
while retries < 3 {
defer { retries += 1 }

do {
return try self.stateLock.withLock { state -> ValkeyNodeClient in
try state.poolFastPath(for: slots)
try state.poolFastPath(for: slots, nodeSelection: nodeSelection)
}
} catch let error as ValkeyClusterError where error == .clusterIsUnavailable {
let waiterID = self.nextRequestID()
Expand Down
11 changes: 8 additions & 3 deletions Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -532,14 +532,18 @@ where
}

@inlinable
package func poolFastPath(for slots: some Collection<HashSlot>) throws(ValkeyClusterError) -> ConnectionPool {
package func poolFastPath(
for slots: some Collection<HashSlot>,
nodeSelection: ValkeyClusterNodeSelection
) throws(ValkeyClusterError) -> ConnectionPool {
switch self.clusterState {
case .unavailable:
throw ValkeyClusterError.clusterIsUnavailable

case .degraded(let context):
let shardID = try context.hashSlotShardMap.nodeID(for: slots)
if let pool = self.runningClients[shardID.primary]?.pool {
let nodeID = nodeSelection.select(nodeIDs: shardID)
if let pool = self.runningClients[nodeID]?.pool {
return pool
}
// If we don't have a node for a shard, that means that this shard got created from
Expand All @@ -549,7 +553,8 @@ where

case .healthy(let context):
let shardID = try context.hashSlotShardMap.nodeID(for: slots)
if let pool = self.runningClients[shardID.primary]?.pool {
let nodeID = nodeSelection.select(nodeIDs: shardID)
if let pool = self.runningClients[nodeID]?.pool {
return pool
}
// If we don't have a node for a shard, that means that this shard got created from
Expand Down
53 changes: 53 additions & 0 deletions Sources/Valkey/Cluster/ValkeyClusterNodeSelection.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//
// This source file is part of the valkey-swift project
// Copyright (c) 2025 the valkey-swift project authors
//
// See LICENSE.txt for license information
// SPDX-License-Identifier: Apache-2.0
//

@usableFromInline
package enum ValkeyClusterNodeSelection: Sendable {
case primary
case cycleReplicas(Int)
case cycleAllNodes(Int)

/// Select node from node ids
/// - Parameter nodeIDs: Primary and replica nodes
/// - Returns: ID of selected node
@usableFromInline
func select(nodeIDs: ValkeyShardNodeIDs) -> ValkeyNodeID {
switch self {
case .primary:
return nodeIDs.primary
case .cycleReplicas(let index):
guard nodeIDs.replicas.count > 0 else { return nodeIDs.primary }
return nodeIDs.replicas[index % nodeIDs.replicas.count]
case .cycleAllNodes(let index):
let index = index % (nodeIDs.replicas.count + 1)
if index == 0 {
return nodeIDs.primary
} else {
return nodeIDs.replicas[index - 1]
}
}
}
}

@available(valkeySwift 1.0, *)
extension ValkeyClientConfiguration.ReadOnlyCommandNodeSelection {
/// Convert from ``ValkeyClientConfiguration/ReadOnlyCommandNodeSelection`` to node selection
@usableFromInline
var clusterNodeSelection: ValkeyClusterNodeSelection {
switch self.value {
case .primary:
.primary
case .cycleReplicas:
.cycleReplicas(Self.idGenerator.next())
case .cycleAllNodes:
.cycleAllNodes(Self.idGenerator.next())
}
}

static let idGenerator: IDGenerator = .init()
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ extension ValkeyClusterClient {
isolation: isolated (any Actor)? = #isolation,
_ operation: (ValkeyConnection) async throws -> sending Value
) async throws -> sending Value {
let node = try await self.nodeClient(for: [])
let node = try await self.nodeClient(for: [], nodeSelection: .primary)
let id = node.subscriptionConnectionIDGenerator.next()

let connection = try await withTaskCancellationHandler {
Expand Down
31 changes: 30 additions & 1 deletion Sources/Valkey/ValkeyClientConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,24 @@ public struct ValkeyClientConfiguration: Sendable {
}
}

/// Determine how nodes are chosen for readonly commands
public struct ReadOnlyCommandNodeSelection: Sendable {
enum _Internal {
case primary
case cycleReplicas
case cycleAllNodes
}

let value: _Internal

/// Always use the primary node
public static var primary: Self { .init(value: .primary) }
/// Cycle through replicas
public static var cycleReplicas: Self { .init(value: .cycleReplicas) }
/// Cycle through primary and replicas
public static var cycleAllNodes: Self { .init(value: .cycleAllNodes) }
}

/// The authentication credentials for the connection.
public var authentication: Authentication?
/// The connection pool configuration.
Expand All @@ -174,6 +192,14 @@ public struct ValkeyClientConfiguration: Sendable {
/// Database Number to use for the Valkey Connection
public var databaseNumber: Int = 0

/// Determine how we chose nodes for readonly commands
///
/// Cluster by default will redirect commands from replica nodes to the primary node.
/// Setting this value to something other than ``ReadOnlyCommandNodeSelection/primary``
/// will allow replicas to run readonly commands. This will reduce load on your primary
/// nodes but there is a chance you will receive stale data as the replica is not up to date.
public var readOnlyCommandNodeSelection: ReadOnlyCommandNodeSelection

#if DistributedTracingSupport
/// The distributed tracing configuration to use for the Valkey connection.
/// Defaults to using the globally bootstrapped tracer with OpenTelemetry semantic conventions.
Expand All @@ -191,6 +217,7 @@ public struct ValkeyClientConfiguration: Sendable {
/// - blockingCommandTimeout: The timeout for a blocking command response.
/// - tls: The TLS configuration.
/// - databaseNumber: The Valkey Database number.
/// - readOnlyCommandNodeSelection: How we choose a node when processing readonly commands
public init(
authentication: Authentication? = nil,
connectionPool: ConnectionPool = .init(),
Expand All @@ -199,7 +226,8 @@ public struct ValkeyClientConfiguration: Sendable {
commandTimeout: Duration = .seconds(30),
blockingCommandTimeout: Duration = .seconds(120),
tls: TLS = .disable,
databaseNumber: Int = 0
databaseNumber: Int = 0,
readOnlyCommandNodeSelection: ReadOnlyCommandNodeSelection = .primary
) {
self.authentication = authentication
self.connectionPool = connectionPool
Expand All @@ -209,5 +237,6 @@ public struct ValkeyClientConfiguration: Sendable {
self.blockingCommandTimeout = blockingCommandTimeout
self.tls = tls
self.databaseNumber = databaseNumber
self.readOnlyCommandNodeSelection = readOnlyCommandNodeSelection
}
}
Loading
Loading