11import CLibMongoC
22import Foundation
3+ import NIO
34import NIOConcurrencyHelpers
45
56/// A connection to the database.
@@ -39,6 +40,8 @@ extension NSCondition {
3940internal class ConnectionPool {
4041 /// Represents the state of a `ConnectionPool`.
4142 private enum State {
43+ /// Indicates the `ConnectionPool` is still starting up in the background.
44+ case opening( future: EventLoopFuture < OpaquePointer > )
4245 /// Indicates that the `ConnectionPool` is open and using the associated pointer to a `mongoc_client_pool_t`.
4346 case open( pool: OpaquePointer )
4447 /// Indicates that the `ConnectionPool` is in the process of closing. Connections can be checked back in, but
@@ -75,23 +78,25 @@ internal class ConnectionPool {
7578 internal static let PoolClosedError = MongoError . LogicError ( message: " ConnectionPool was already closed " )
7679
7780 /// Initializes the pool using the provided `ConnectionString`.
78- internal init ( from connString: ConnectionString ) throws {
79- let pool : OpaquePointer = try connString. withMongocURI { uriPtr in
80- guard let pool = mongoc_client_pool_new ( uriPtr) else {
81- throw MongoError . InternalError ( message: " Failed to initialize libmongoc client pool " )
82- }
83- return pool
84- }
81+ internal init ( from connString: ConnectionString , executor: OperationExecutor ) throws {
82+ let poolFut = executor. execute ( on: nil ) { ( ) -> OpaquePointer in
83+ try connString. withMongocURI { uriPtr in
84+ guard let pool = mongoc_client_pool_new ( uriPtr) else {
85+ throw MongoError . InternalError ( message: " Failed to initialize libmongoc client pool " )
86+ }
8587
86- self . state = . open( pool: pool)
88+ guard mongoc_client_pool_set_error_api ( pool, MONGOC_ERROR_API_VERSION_2) else {
89+ fatalError ( " Could not configure error handling on client pool " )
90+ }
8791
88- guard mongoc_client_pool_set_error_api ( pool, MONGOC_ERROR_API_VERSION_2) else {
89- fatalError ( " Could not configure error handling on client pool " )
92+ // We always set min_heartbeat_frequency because the hard-coded default in the vendored mongoc
93+ // was lowered to 50. Setting it here brings it to whatever was specified, or 500 if it wasn't.
94+ mongoc_client_pool_set_min_heartbeat_frequency_msec ( pool, UInt64 ( connString. minHeartbeatFrequencyMS) )
95+ return pool
96+ }
9097 }
9198
92- // We always set min_heartbeat_frequency because the hard-coded default in the vendored mongoc
93- // was lowered to 50. Setting it here brings it to whatever was specified, or 500 if it wasn't.
94- mongoc_client_pool_set_min_heartbeat_frequency_msec ( pool, UInt64 ( connString. minHeartbeatFrequencyMS) )
99+ self . state = . opening( future: poolFut)
95100 }
96101
97102 deinit {
@@ -101,13 +106,36 @@ internal class ConnectionPool {
101106 }
102107 }
103108
109+ /// Execute the provided closure with the pointer to the created `mongoc_client_pool_t`, potentially waiting
110+ /// for the pool to finish starting up first. The closure will be executed while the stateLock is held, so
111+ /// closure MUST NOT attempt to acquire the lock.
112+ ///
113+ /// Throws an error if the pool is closed or closing.
114+ private func withResolvedPool< T> ( body: ( OpaquePointer ) throws -> T ) throws -> T {
115+ try self . stateLock. withLock {
116+ switch self . state {
117+ case let . open( pool) :
118+ return try body ( pool)
119+ case let . opening( future) :
120+ let pool = try future. wait ( )
121+ self . state = . open( pool: pool)
122+ return try body ( pool)
123+ case . closed, . closing:
124+ throw Self . PoolClosedError
125+ }
126+ }
127+ }
128+
104129 /// Closes the pool, cleaning up underlying resources. **This method blocks until all connections are returned to
105130 /// the pool.**
106131 internal func close( ) throws {
107132 try self . stateLock. withLock {
108133 switch self . state {
109134 case let . open( pool) :
110135 self . state = . closing( pool: pool)
136+ case let . opening( future) :
137+ let pool = try future. wait ( )
138+ self . state = . closing( pool: pool)
111139 case . closing, . closed:
112140 throw Self . PoolClosedError
113141 }
@@ -118,7 +146,7 @@ internal class ConnectionPool {
118146 }
119147
120148 switch self . state {
121- case . open, . closed:
149+ case . open, . closed, . opening :
122150 throw MongoError . InternalError (
123151 message: " ConnectionPool in unexpected state \( self . state) during close() "
124152 )
@@ -133,32 +161,22 @@ internal class ConnectionPool {
133161 /// This method will block until a connection is available. Throws an error if the pool is in the process of
134162 /// closing or has finished closing.
135163 internal func checkOut( ) throws -> Connection {
136- try self . stateLock. withLock {
137- switch self . state {
138- case let . open( pool) :
139- self . connCount += 1
140- return Connection ( clientHandle: mongoc_client_pool_pop ( pool) , pool: self )
141- case . closing, . closed:
142- throw Self . PoolClosedError
143- }
164+ try self . withResolvedPool { pool in
165+ self . connCount += 1
166+ return Connection ( clientHandle: mongoc_client_pool_pop ( pool) , pool: self )
144167 }
145168 }
146169
147170 /// Checks out a connection from the pool, or returns `nil` if none are currently available. Throws an error if the
148171 /// pool is not open. This method may block waiting on the state lock as well as libmongoc's locks and thus must be
149172 // run within the thread pool.
150173 internal func tryCheckOut( ) throws -> Connection ? {
151- try self . stateLock. withLock {
152- switch self . state {
153- case let . open( pool) :
154- guard let handle = mongoc_client_pool_try_pop ( pool) else {
155- return nil
156- }
157- self . connCount += 1
158- return Connection ( clientHandle: handle, pool: self )
159- case . closing, . closed:
160- throw Self . PoolClosedError
174+ try self . withResolvedPool { pool in
175+ guard let handle = mongoc_client_pool_try_pop ( pool) else {
176+ return nil
161177 }
178+ self . connCount += 1
179+ return Connection ( clientHandle: handle, pool: self )
162180 }
163181 }
164182
@@ -178,6 +196,8 @@ internal class ConnectionPool {
178196 self . stateLock. signal ( )
179197 case . closed:
180198 throw Self . PoolClosedError
199+ case . opening:
200+ fatalError ( " ConnectionPool in unexpected state \( self . state) while checking in a connection " )
181201 }
182202 }
183203 }
@@ -230,15 +250,31 @@ internal class ConnectionPool {
230250 /// may only be called before any connections are checked out of the pool.** Ideally this code would just live in
231251 /// `ConnectionPool.init`. However, the client we accept here has to be fully initialized before we can pass it
232252 /// as the context. In order for it to be fully initialized its pool must exist already.
253+ ///
254+ /// This method takes ownership of the provided `mongoc_apm_callbacks_t`, so the pointer MUST NOT be used or freed
255+ /// after it is passed to this method.
233256 internal func setAPMCallbacks( callbacks: OpaquePointer , client: MongoClient ) {
234257 // lock isn't needed as this is called before pool is in use.
235- switch self . state {
236- case let . open( pool) :
237- mongoc_client_pool_set_apm_callbacks ( pool, callbacks, Unmanaged . passUnretained ( client) . toOpaque ( ) )
238- case . closing, . closed:
258+ guard case let . opening( future) = self . state else {
239259 // this method is called via `initializeMonitoring()`, which is called from `MongoClient.init`.
240260 // unless we have a bug it's impossible that the pool is already closed.
241261 fatalError ( " ConnectionPool in unexpected state \( self . state) while setting APM callbacks " )
242262 }
263+
264+ // to ensure the callbacks get set before any waiting on this future becomes unblocked, use
265+ // `map` to create a new future instead of .whenSuccess
266+ let newFut = future. map { pool -> OpaquePointer in
267+ mongoc_client_pool_set_apm_callbacks ( pool, callbacks, Unmanaged . passUnretained ( client) . toOpaque ( ) )
268+ mongoc_apm_callbacks_destroy ( callbacks)
269+ return pool
270+ }
271+
272+ // this shouldn't be reached but just in case we still free the memory
273+ newFut. whenFailure { _ in
274+ mongoc_apm_callbacks_destroy ( callbacks)
275+ }
276+
277+ // update the stored future to refer to the one that sets the callbacks
278+ self . state = . opening( future: newFut)
243279 }
244280}
0 commit comments