Skip to content

Commit f42a51d

Browse files
authored
Provide way to set cluster consistency (#8)
* Provide way to set cluster consistency rdar://102086631 * Run swiftformat * Change swiftformat rule to put access control keyword on declaration rather than extension * Clean up configuration init * Document default consistency levels
1 parent a39d71d commit f42a51d

File tree

11 files changed

+226
-238
lines changed

11 files changed

+226
-238
lines changed

.swiftformat

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
--ifdef no-indent
1010
--indent 4
11+
--extensionacl on-declarations
1112
--patternlet inline
1213
--self insert
1314
--stripunusedargs closure-only

Sources/CassandraClient/CassandraClient.swift

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public class CassandraClient: CassandraSession {
9090
/// - logger: If `nil`, the client's default `Logger` is used.
9191
///
9292
/// - Returns: The resulting ``Rows``.
93-
public func execute(statement: Statement, on eventLoop: EventLoop?, logger: Logger? = nil) -> EventLoopFuture<Rows> {
93+
public func execute(statement: Statement, on eventLoop: EventLoop?, logger: Logger? = .none) -> EventLoopFuture<Rows> {
9494
self.defaultSession.execute(statement: statement, on: eventLoop, logger: logger)
9595
}
9696

@@ -105,7 +105,7 @@ public class CassandraClient: CassandraSession {
105105
/// - logger: If `nil`, the client's default `Logger` is used.
106106
///
107107
/// - Returns: The ``PaginatedRows``.
108-
public func execute(statement: Statement, pageSize: Int32, on eventLoop: EventLoop?, logger: Logger? = nil) -> EventLoopFuture<PaginatedRows> {
108+
public func execute(statement: Statement, pageSize: Int32, on eventLoop: EventLoop?, logger: Logger? = .none) -> EventLoopFuture<PaginatedRows> {
109109
self.defaultSession.execute(statement: statement, pageSize: pageSize, on: eventLoop, logger: logger)
110110
}
111111

@@ -116,7 +116,7 @@ public class CassandraClient: CassandraSession {
116116
/// - logger: If `nil`, the client's default `Logger` is used.
117117
///
118118
/// - Returns: The newly created session.
119-
public func makeSession(keyspace: String?, logger: Logger? = nil) -> CassandraSession {
119+
public func makeSession(keyspace: String?, logger: Logger? = .none) -> CassandraSession {
120120
var configuration = self.configuration
121121
configuration.keyspace = keyspace
122122
let logger = logger ?? self.logger
@@ -129,7 +129,7 @@ public class CassandraClient: CassandraSession {
129129
/// - keyspace: If `nil`, the client's default keyspace is used.
130130
/// - logger: If `nil`, the client's default `Logger` is used.
131131
/// - handler: The closure to invoke, passing in the newly created session.
132-
public func withSession(keyspace: String?, logger: Logger? = nil, handler: (CassandraSession) throws -> Void) rethrows {
132+
public func withSession(keyspace: String?, logger: Logger? = .none, handler: (CassandraSession) throws -> Void) rethrows {
133133
let session = self.makeSession(keyspace: keyspace, logger: logger)
134134
defer {
135135
do {
@@ -149,7 +149,7 @@ public class CassandraClient: CassandraSession {
149149
/// - handler: The closure to invoke, passing in the newly created session.
150150
///
151151
/// - Returns: The resulting `EventLoopFuture` of the closure.
152-
public func withSession<T>(keyspace: String?, logger: Logger? = nil, handler: (CassandraSession) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
152+
public func withSession<T>(keyspace: String?, logger: Logger? = .none, handler: (CassandraSession) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
153153
let session = self.makeSession(keyspace: keyspace, logger: logger)
154154
return handler(session).always { _ in
155155
do {
@@ -175,7 +175,7 @@ public class CassandraClient: CassandraSession {
175175
}
176176

177177
#if compiler(>=5.5) && canImport(_Concurrency)
178-
public extension CassandraClient {
178+
extension CassandraClient {
179179
/// Execute a ``Statement`` using the default ``CassandraSession``.
180180
///
181181
/// **All** rows are returned.
@@ -186,7 +186,7 @@ public extension CassandraClient {
186186
///
187187
/// - Returns: The resulting ``Rows``.
188188
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
189-
func execute(statement: Statement, logger: Logger? = nil) async throws -> Rows {
189+
public func execute(statement: Statement, logger: Logger? = .none) async throws -> Rows {
190190
try await self.defaultSession.execute(statement: statement, logger: logger)
191191
}
192192

@@ -201,7 +201,7 @@ public extension CassandraClient {
201201
///
202202
/// - Returns: The ``PaginatedRows``.
203203
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
204-
func execute(statement: Statement, pageSize: Int32, logger: Logger? = nil) async throws -> PaginatedRows {
204+
public func execute(statement: Statement, pageSize: Int32, logger: Logger? = .none) async throws -> PaginatedRows {
205205
try await self.defaultSession.execute(statement: statement, pageSize: pageSize, logger: logger)
206206
}
207207

@@ -212,7 +212,7 @@ public extension CassandraClient {
212212
/// - logger: If `nil`, the client's default `Logger` is used.
213213
/// - closure: The closure to invoke, passing in the newly created session.
214214
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
215-
func withSession(keyspace: String?, logger: Logger? = nil, closure: (CassandraSession) async throws -> Void) async throws {
215+
public func withSession(keyspace: String?, logger: Logger? = .none, closure: (CassandraSession) async throws -> Void) async throws {
216216
let session = self.makeSession(keyspace: keyspace, logger: logger)
217217
defer {
218218
do {
@@ -233,7 +233,7 @@ public extension CassandraClient {
233233
///
234234
/// - Returns: The result of the closure.
235235
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
236-
func withSession<T>(keyspace: String?, logger: Logger? = nil, handler: (CassandraSession) async throws -> T) async throws -> T {
236+
public func withSession<T>(keyspace: String?, logger: Logger? = .none, handler: (CassandraSession) async throws -> T) async throws -> T {
237237
let session = self.makeSession(keyspace: keyspace, logger: logger)
238238
defer {
239239
do {

Sources/CassandraClient/Configuration.swift

Lines changed: 31 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
import NIO
1717

1818
// TODO: add more config option per C++ cluster impl
19-
public extension CassandraClient {
19+
extension CassandraClient {
2020
/// Configuration for the ``CassandraClient``.
21-
struct Configuration: CustomStringConvertible {
21+
public struct Configuration: CustomStringConvertible {
2222
public typealias ContactPoints = [String]
2323

2424
/// Provides the initial `ContactPoints` of the Cassandra cluster.
2525
/// This can be a subset since each Cassandra instance is capable of discovering its peers.
2626
public var contactPointsProvider: (@escaping (Result<ContactPoints, Swift.Error>) -> Void) -> Void
27+
2728
public var port: Int32
2829
public var protocolVersion: ProtocolVersion
2930
public var username: String?
@@ -37,7 +38,7 @@ public extension CassandraClient {
3738
public var coreConnectionsPerHost: UInt32?
3839
public var tcpNodelay: Bool?
3940
public var tcpKeepalive: Bool?
40-
public var tcpKeepaliveDelaySeconds: UInt32
41+
public var tcpKeepaliveDelaySeconds: UInt32 = 0
4142
public var connectionHeartbeatInterval: UInt32?
4243
public var connectionIdleTimeout: UInt32?
4344
public var schema: Bool?
@@ -47,6 +48,9 @@ public extension CassandraClient {
4748
public var prepareStrategy: PrepareStrategy?
4849
public var compact: Bool?
4950

51+
/// Sets the cluster's consistency level. Default is `.localOne`.
52+
public var consistency: CassandraClient.Consistency?
53+
5054
public enum SpeculativeExecutionPolicy {
5155
case constant(delayInMillseconds: Int64, maxExecutions: Int32)
5256
case disabled
@@ -68,51 +72,11 @@ public extension CassandraClient {
6872
public init(
6973
contactPointsProvider: @escaping (@escaping (Result<ContactPoints, Swift.Error>) -> Void) -> Void,
7074
port: Int32,
71-
protocolVersion: ProtocolVersion,
72-
username: String? = nil,
73-
password: String? = nil,
74-
ssl: SSL? = nil,
75-
keyspace: String? = nil,
76-
numIOThreads: UInt32? = nil,
77-
connectTimeoutMillis: UInt32? = nil,
78-
requestTimeoutMillis: UInt32? = nil,
79-
resolveTimeoutMillis: UInt32? = nil,
80-
coreConnectionsPerHost: UInt32? = nil,
81-
tcpNodelay: Bool? = nil,
82-
tcpKeepalive: Bool? = nil,
83-
tcpKeepaliveDelaySeconds: UInt32 = 0,
84-
connectionHeartbeatInterval: UInt32? = nil,
85-
connectionIdleTimeout: UInt32? = nil,
86-
schema: Bool? = nil,
87-
hostnameResolution: Bool? = nil,
88-
randomizedContactPoints: Bool? = nil,
89-
speculativeExecutionPolicy: SpeculativeExecutionPolicy? = nil,
90-
prepareStrategy: PrepareStrategy? = nil,
91-
compact: Bool? = nil
75+
protocolVersion: ProtocolVersion
9276
) {
9377
self.contactPointsProvider = contactPointsProvider
9478
self.port = port
9579
self.protocolVersion = protocolVersion
96-
self.username = username
97-
self.password = password
98-
self.ssl = ssl
99-
self.keyspace = keyspace
100-
self.numIOThreads = numIOThreads
101-
self.connectTimeoutMillis = connectTimeoutMillis
102-
self.requestTimeoutMillis = requestTimeoutMillis
103-
self.resolveTimeoutMillis = resolveTimeoutMillis
104-
self.coreConnectionsPerHost = coreConnectionsPerHost
105-
self.tcpNodelay = tcpNodelay
106-
self.tcpKeepalive = tcpKeepalive
107-
self.tcpKeepaliveDelaySeconds = tcpKeepaliveDelaySeconds
108-
self.connectionHeartbeatInterval = connectionHeartbeatInterval
109-
self.connectionIdleTimeout = connectionIdleTimeout
110-
self.schema = schema
111-
self.hostnameResolution = hostnameResolution
112-
self.randomizedContactPoints = randomizedContactPoints
113-
self.speculativeExecutionPolicy = speculativeExecutionPolicy
114-
self.prepareStrategy = prepareStrategy
115-
self.compact = compact
11680
}
11781

11882
internal func makeCluster(on eventLoop: EventLoop) -> EventLoopFuture<Cluster> {
@@ -167,40 +131,40 @@ public extension CassandraClient {
167131
if let ssl = self.ssl {
168132
try cluster.setSSL(try ssl.makeSSLContext())
169133
}
170-
if let value = numIOThreads {
134+
if let value = self.numIOThreads {
171135
try cluster.setNumThreadsIO(value)
172136
}
173-
if let value = connectTimeoutMillis {
137+
if let value = self.connectTimeoutMillis {
174138
try cluster.setConnectTimeout(value)
175139
}
176-
if let value = requestTimeoutMillis {
140+
if let value = self.requestTimeoutMillis {
177141
try cluster.setRequestTimeout(value)
178142
}
179-
if let value = resolveTimeoutMillis {
143+
if let value = self.resolveTimeoutMillis {
180144
try cluster.setResolveTimeout(value)
181145
}
182-
if let value = coreConnectionsPerHost {
146+
if let value = self.coreConnectionsPerHost {
183147
try cluster.setCoreConnectionsPerHost(value)
184148
}
185-
if let value = tcpNodelay {
149+
if let value = self.tcpNodelay {
186150
try cluster.setTcpNodelay(value)
187151
}
188-
if let value = tcpKeepalive {
152+
if let value = self.tcpKeepalive {
189153
try cluster.setTcpKeepalive(value, delayInSeconds: self.tcpKeepaliveDelaySeconds)
190154
}
191-
if let value = connectionHeartbeatInterval {
155+
if let value = self.connectionHeartbeatInterval {
192156
try cluster.setConnectionHeartbeatInterval(value)
193157
}
194-
if let value = connectionIdleTimeout {
158+
if let value = self.connectionIdleTimeout {
195159
try cluster.setConnectionIdleTimeout(value)
196160
}
197-
if let value = schema {
161+
if let value = self.schema {
198162
try cluster.setUseSchema(value)
199163
}
200-
if let value = hostnameResolution {
164+
if let value = self.hostnameResolution {
201165
try cluster.setUseHostnameResolution(value)
202166
}
203-
if let value = randomizedContactPoints {
167+
if let value = self.randomizedContactPoints {
204168
try cluster.setUseRandomizedContactPoints(value)
205169
}
206170
switch self.speculativeExecutionPolicy {
@@ -219,9 +183,12 @@ public extension CassandraClient {
219183
case .none:
220184
break
221185
}
222-
if let value = compact {
186+
if let value = self.compact {
223187
try cluster.setNoCompact(!value)
224188
}
189+
if let value = self.consistency {
190+
try cluster.setConsistency(value.cassConsistency)
191+
}
225192

226193
return cluster
227194
}
@@ -338,6 +305,10 @@ internal final class Cluster {
338305
try self.checkResult { cass_cluster_set_no_compact(self.rawPointer, enabled ? cass_true : cass_false) }
339306
}
340307

308+
func setConsistency(_ consistency: CassConsistency) throws {
309+
try self.checkResult { cass_cluster_set_consistency(self.rawPointer, consistency) }
310+
}
311+
341312
func setSSL(_ ssl: SSLContext) throws {
342313
cass_cluster_set_ssl(self.rawPointer, ssl.rawPointer)
343314
}
@@ -352,8 +323,8 @@ internal final class Cluster {
352323

353324
// MARK: - SSL
354325

355-
public extension CassandraClient.Configuration {
356-
struct SSL {
326+
extension CassandraClient.Configuration {
327+
public struct SSL {
357328
public var trustedCertificates: [String]?
358329
public var verifyFlag: VerifyFlag?
359330
public var cert: String?
@@ -373,9 +344,7 @@ public extension CassandraClient.Configuration {
373344
case peerIdentityDNS
374345
}
375346

376-
public init(trustedCertificates: [String]?) {
377-
self.trustedCertificates = trustedCertificates
378-
}
347+
public init() {}
379348

380349
func makeSSLContext() throws -> SSLContext {
381350
let sslContext = SSLContext()
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Cassandra Client open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the Swift Cassandra Client project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of Swift Cassandra Client project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@_implementationOnly import CDataStaxDriver
16+
17+
extension CassandraClient {
18+
/// Consistency levels
19+
public enum Consistency {
20+
case any
21+
case one
22+
case two
23+
case three
24+
case quorum
25+
case all
26+
case localQuorum
27+
case eachQuorum
28+
case serial
29+
case localSerial
30+
case localOne
31+
32+
var cassConsistency: CassConsistency {
33+
switch self {
34+
case .any:
35+
return CASS_CONSISTENCY_ANY
36+
case .one:
37+
return CASS_CONSISTENCY_ONE
38+
case .two:
39+
return CASS_CONSISTENCY_TWO
40+
case .three:
41+
return CASS_CONSISTENCY_THREE
42+
case .quorum:
43+
return CASS_CONSISTENCY_QUORUM
44+
case .all:
45+
return CASS_CONSISTENCY_ALL
46+
case .localQuorum:
47+
return CASS_CONSISTENCY_LOCAL_QUORUM
48+
case .eachQuorum:
49+
return CASS_CONSISTENCY_EACH_QUORUM
50+
case .serial:
51+
return CASS_CONSISTENCY_SERIAL
52+
case .localSerial:
53+
return CASS_CONSISTENCY_LOCAL_SERIAL
54+
case .localOne:
55+
return CASS_CONSISTENCY_LOCAL_ONE
56+
}
57+
}
58+
}
59+
}

0 commit comments

Comments
 (0)