@@ -77,25 +77,25 @@ class asio_connection
77
77
friend class asio_connection_pool ;
78
78
friend class asio_client ;
79
79
public:
80
- asio_connection (boost::asio::io_service& io_service, bool start_with_ssl, const std::function<void (boost::asio::ssl::context&)>& ssl_context_callback) :
81
- m_socket (io_service),
82
- m_ssl_context_callback (ssl_context_callback),
83
- m_pool_timer (io_service),
84
- m_is_reused (false ),
85
- m_keep_alive (true )
80
+ asio_connection (
81
+ boost::asio::io_service& io_service,
82
+ const std::string &pool_key,
83
+ bool start_with_ssl,
84
+ const std::function<void (boost::asio::ssl::context&)>& ssl_context_callback)
85
+ : m_socket(io_service),
86
+ m_ssl_context_callback (ssl_context_callback),
87
+ m_pool_timer(io_service),
88
+ m_is_reused(false ),
89
+ m_keep_alive(true ),
90
+ m_pool_key(pool_key),
91
+ m_epoch(0 )
86
92
{
87
93
if (start_with_ssl)
88
94
{
89
95
upgrade_to_ssl ();
90
96
}
91
97
}
92
98
93
- asio_connection (boost::asio::io_service& io_service, const std::string &pool_key, bool start_with_ssl, const std::function<void (boost::asio::ssl::context&)>& ssl_context_callback) :
94
- asio_connection (io_service, start_with_ssl, ssl_context_callback)
95
- {
96
- m_pool_key = pool_key;
97
- }
98
-
99
99
~asio_connection ()
100
100
{
101
101
close ();
@@ -146,8 +146,7 @@ class asio_connection
146
146
bool keep_alive () const { return m_keep_alive; }
147
147
bool is_ssl () const { return m_ssl_stream ? true : false ; }
148
148
const std::string &pool_key () const { return m_pool_key; }
149
- const std::string &nonce () const { return m_nonce; }
150
- void generate_nonce () { m_nonce = m_nonce_generator.generate (); }
149
+ uint32_t epoch () const { return m_epoch; }
151
150
152
151
template <typename Iterator, typename Handler>
153
152
void async_connect (const Iterator &begin, const Handler &handler)
@@ -240,7 +239,7 @@ class asio_connection
240
239
{
241
240
cancel_pool_timer ();
242
241
m_is_reused = true ;
243
- generate_nonce () ;
242
+ m_epoch++ ;
244
243
}
245
244
246
245
// Guards concurrent access to socket/ssl::stream. This is necessary
@@ -256,18 +255,71 @@ class asio_connection
256
255
bool m_is_reused;
257
256
bool m_keep_alive;
258
257
std::string m_pool_key;
259
- std::string m_nonce;
260
- utility::nonce_generator m_nonce_generator;
258
+ uint32_t m_epoch;
259
+ };
260
+
261
+ class asio_shared_connection_pool
262
+ {
263
+ public:
264
+ asio_shared_connection_pool (boost::asio::io_service& io_service) :
265
+ m_io_service (io_service)
266
+ {}
267
+
268
+ ~asio_shared_connection_pool ()
269
+ {
270
+ std::lock_guard<std::mutex> lock (m_connections_mutex);
271
+ // Cancel the pool timer for all connections.
272
+ for (auto & connection : m_connections)
273
+ {
274
+ connection.second ->cancel_pool_timer ();
275
+ }
276
+ }
277
+
278
+ void release (const std::shared_ptr<asio_connection>& connection)
279
+ {
280
+ if (connection->keep_alive () && (m_timeout_secs > 0 ))
281
+ {
282
+ connection->cancel ();
283
+
284
+ std::lock_guard<std::mutex> lock (m_connections_mutex);
285
+ auto it = m_connections.insert (std::make_pair (connection->pool_key (), connection));
286
+
287
+ // This will destroy and remove the connection from pool after the set timeout.
288
+ // We use 'this' because async calls to timer handler only occur while the pool exists.
289
+ auto connection_weak = std::weak_ptr<asio_connection>(connection);
290
+ auto epoch = connection->epoch ();
291
+ connection->start_pool_timer (m_timeout_secs, [this , connection_weak, epoch](const boost::error_code& ec) {
292
+ this ->free_shared_connection (ec, connection_weak, epoch);
293
+ });
294
+ }
295
+ }
296
+
297
+ private:
298
+ void free_connection (const boost::system::error_code& ec, const std::weak_ptr<asio_connection> &connection, unsigned int epoch)
299
+ {
300
+ auto connection_shared = connection.lock ();
301
+ if (!connection_shared)
302
+ return ;
303
+
304
+ std::lock_guard<std::mutex> lock (m_connections_mutex);
305
+ auto it = m_connections.find (connection_shared);
306
+ if (it == m_connections.end ())
307
+ // The connection was acquired while this callback was firing
308
+ return ;
309
+
310
+ // The epoch is used to ensure the connection was not quickly acquired and released while this callback was firing.
311
+ // Every acquisition increments the epoch.
312
+ if (epoch != (*it)->epoch ())
313
+ m_connections.erase (it);
314
+ }
261
315
};
262
316
263
317
class asio_connection_pool
264
318
{
265
319
public:
266
320
267
- asio_connection_pool (boost::asio::io_service& io_service, const std::chrono::seconds &idle_timeout, bool is_shared) :
268
- m_io_service (io_service),
269
- m_timeout_secs (static_cast <int >(idle_timeout.count())),
270
- m_is_shared (is_shared)
321
+ asio_connection_pool (boost::asio::io_service& io_service) :
322
+ m_io_service (io_service)
271
323
{}
272
324
273
325
~asio_connection_pool ()
@@ -282,29 +334,15 @@ class asio_connection_pool
282
334
283
335
void release (const std::shared_ptr<asio_connection> &connection)
284
336
{
285
- if (connection->keep_alive () && (m_timeout_secs > 0 ))
286
- {
287
- connection->cancel ();
337
+ connection->cancel ();
288
338
289
- if (m_is_shared)
290
- {
291
- std::lock_guard<std::mutex> lock (m_connections_mutex);
292
- auto it = m_shared_connections.insert (std::make_pair (connection->pool_key (), connection));
293
- // This will destroy and remove the connection from pool after the set timeout.
294
- // We use 'this' because async calls to timer handler only occur while the pool exists.
295
- connection->start_pool_timer (m_timeout_secs, boost::bind (&asio_connection_pool::free_shared_connection, this , boost::asio::placeholders::error, it, std::weak_ptr<asio_connection>(connection), connection->nonce ()));
296
- }
297
- else
298
- {
299
- std::lock_guard<std::mutex> lock (m_connections_mutex);
300
- auto pair = m_connections.insert (connection);
301
- if (pair.second )
302
- {
303
- // This will destroy and remove the connection from pool after the set timeout.
304
- // We use 'this' because async calls to timer handler only occur while the pool exists.
305
- connection->start_pool_timer (m_timeout_secs, boost::bind (&asio_connection_pool::free_connection, this , boost::asio::placeholders::error, pair.first , std::weak_ptr<asio_connection>(connection), connection->nonce ()));
306
- }
307
- }
339
+ if (connection->keep_alive ())
340
+ {
341
+ std::lock_guard<std::mutex> lock (m_connections_mutex);
342
+ // This will destroy and remove the connection from pool after the set timeout.
343
+ // We use 'this' because async calls to timer handler only occur while the pool exists.
344
+ connection->start_pool_timer (s_timeout_secs.count (), boost::bind (&asio_connection_pool::free_connection, this , boost::asio::placeholders::error, pair.first , std::weak_ptr<asio_connection>(connection), connection->nonce ()));
345
+ m_connections.push_back (connection);
308
346
}
309
347
// Otherwise connection is not put to the pool and it will go out of scope.
310
348
}
@@ -377,26 +415,33 @@ class asio_connection_pool
377
415
}
378
416
379
417
// Using weak_ptr here ensures bind() to this handler will not prevent the connection object from going out of scope.
380
- void free_connection (const boost::system::error_code& ec, std::set<std::shared_ptr<asio_connection>>::iterator it, const std::weak_ptr<asio_connection> &connection, const std::string &nonce )
418
+ void free_connection (const boost::system::error_code& ec, const std::weak_ptr<asio_connection> &connection, uint32_t epoch )
381
419
{
382
420
if (!ec)
383
421
{
384
422
auto connection_shared = connection.lock ();
385
- // Compare nonce here to ensure the iterator is valid, the connection not been reused.
386
- if (connection_shared && (connection_shared->nonce () == nonce))
387
- {
388
- std::lock_guard<std::mutex> lock (m_connections_mutex);
423
+ if (!connection_shared)
424
+ return ;
425
+
426
+ std::lock_guard<std::mutex> lock (m_connections_mutex);
427
+ auto it = m_connections.find (connection_shared);
428
+ if (it == m_connections.end ())
429
+ // The connection was acquired while this callback was firing
430
+ return ;
431
+
432
+ // The epoch is used to ensure the connection was not quickly acquired and released while this callback was firing.
433
+ // Every acquisition increments the epoch.
434
+ if (epoch != (*it)->epoch ())
389
435
m_connections.erase (it);
390
- }
391
436
}
392
437
}
393
438
394
439
boost::asio::io_service& m_io_service;
395
- const int m_timeout_secs;
396
- bool m_is_shared;
397
- std::multimap<std::string, std::shared_ptr<asio_connection>> m_shared_connections;
398
- std::set<std::shared_ptr<asio_connection>> m_connections;
440
+
399
441
std::mutex m_connections_mutex;
442
+ std::set<std::shared_ptr<asio_connection>> m_connections;
443
+
444
+ static const std::chrono::seconds s_timeout_secs = 30 ;
400
445
};
401
446
402
447
std::shared_ptr<asio_connection_pool> asio_connection_pool::shared_instance ()
0 commit comments