Skip to content

Commit 6e23e69

Browse files
committed
add host based connection pool map
1 parent 1e0b5c5 commit 6e23e69

File tree

2 files changed

+53
-12
lines changed

2 files changed

+53
-12
lines changed

Release/include/pplx/threadpool.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
#include "pplx/pplx.h"
4343
#endif
4444

45+
namespace web { namespace http { namespace client { namespace details {
46+
class asio_connection_pool;
47+
}}}}
48+
4549
namespace crossplat {
4650

4751
#if (defined(ANDROID) || defined(__ANDROID__))
@@ -97,6 +101,19 @@ class threadpool
97101
return m_service;
98102
}
99103

104+
std::shared_ptr<web::http::client::details::asio_connection_pool> obtain_connection_pool(const std::string &base_uri, std::function<std::shared_ptr<web::http::client::details::asio_connection_pool>()> connection_pool_generator)
105+
{
106+
std::lock_guard<std::mutex> lg(m_connection_pool_map_mutex);
107+
108+
auto &pool = m_connection_pool_map[base_uri];
109+
if (!pool)
110+
{
111+
pool = connection_pool_generator();
112+
}
113+
114+
return pool;
115+
}
116+
100117
private:
101118
struct _cancel_thread { };
102119

@@ -156,6 +173,9 @@ class threadpool
156173
std::vector<pthread_t> m_threads;
157174
boost::asio::io_service m_service;
158175
boost::asio::io_service::work m_work;
176+
177+
std::mutex m_connection_pool_map_mutex;
178+
std::map<std::string, std::shared_ptr<web::http::client::details::asio_connection_pool>> m_connection_pool_map;
159179
};
160180

161181
}

Release/src/http/client/http_client_asio.cpp

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -325,32 +325,53 @@ class asio_connection_pool
325325
boost::asio::io_service& m_io_service;
326326
const int m_timeout_secs;
327327
const bool m_start_with_ssl;
328-
const std::function<void(boost::asio::ssl::context&)>& m_ssl_context_callback;
328+
std::function<void(boost::asio::ssl::context&)> m_ssl_context_callback;
329329
std::vector<std::shared_ptr<asio_connection> > m_connections;
330330
std::mutex m_connections_mutex;
331331
};
332332

333333

334-
335334
class asio_client : public _http_client_communicator
336335
{
337336
public:
338337
asio_client(http::uri address, http_client_config client_config)
339338
: _http_client_communicator(std::move(address), std::move(client_config))
340-
, m_pool(crossplat::threadpool::shared_instance().service(),
341-
base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(),
342-
std::chrono::seconds(30), // Unused sockets are kept in pool for 30 seconds.
343-
this->client_config().get_ssl_context_callback())
344339
, m_resolver(crossplat::threadpool::shared_instance().service())
345-
{}
340+
{
341+
std::string host = base_uri().to_string();
342+
343+
auto &credentials = _http_client_communicator::client_config().credentials();
344+
if (credentials.is_set())
345+
{
346+
host.append(credentials.username());
347+
}
348+
349+
auto &proxy = _http_client_communicator::client_config().proxy();
350+
if (proxy.is_specified())
351+
{
352+
host.append(proxy.address().to_string());
353+
if (proxy.credentials().is_set())
354+
{
355+
host.append(proxy.credentials().username());
356+
}
357+
}
358+
359+
m_pool = crossplat::threadpool::shared_instance().obtain_connection_pool(host, [this]()
360+
{
361+
return std::make_shared<asio_connection_pool>(crossplat::threadpool::shared_instance().service(),
362+
base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(),
363+
std::chrono::seconds(30), // Unused sockets are kept in pool for 30 seconds.
364+
this->client_config().get_ssl_context_callback());
365+
});
366+
}
346367

347368
void send_request(const std::shared_ptr<request_context> &request_ctx) override;
348369

349370
unsigned long open() override { return 0; }
350371

351372
virtual pplx::task<http_response> propagate(http_request request) override;
352373

353-
asio_connection_pool m_pool;
374+
std::shared_ptr<asio_connection_pool> m_pool;
354375
tcp::resolver m_resolver;
355376
};
356377

@@ -375,13 +396,13 @@ class asio_context : public request_context, public std::enable_shared_from_this
375396
{
376397
m_timer.stop();
377398
// Release connection back to the pool. If connection was not closed, it will be put to the pool for reuse.
378-
std::static_pointer_cast<asio_client>(m_http_client)->m_pool.release(m_connection);
399+
std::static_pointer_cast<asio_client>(m_http_client)->m_pool->release(m_connection);
379400
}
380401

381402
static std::shared_ptr<request_context> create_request_context(std::shared_ptr<_http_client_communicator> &client, http_request &request)
382403
{
383404
auto client_cast(std::static_pointer_cast<asio_client>(client));
384-
auto connection(client_cast->m_pool.obtain());
405+
auto connection(client_cast->m_pool->obtain());
385406
auto ctx = std::make_shared<asio_context>(client, request, connection);
386407
ctx->m_timer.set_ctx(std::weak_ptr<asio_context>(ctx));
387408
return ctx;
@@ -458,7 +479,7 @@ class asio_context : public request_context, public std::enable_shared_from_this
458479
m_context->m_timer.reset();
459480
//// Replace the connection. This causes old connection object to go out of scope.
460481
auto client = std::static_pointer_cast<asio_client>(m_context->m_http_client);
461-
m_context->m_connection = client->m_pool.obtain();
482+
m_context->m_connection = client->m_pool->obtain();
462483

463484
auto endpoint = *endpoints;
464485
m_context->m_connection->async_connect(endpoint, boost::bind(&ssl_proxy_tunnel::handle_tcp_connect, shared_from_this(), boost::asio::placeholders::error, ++endpoints));
@@ -811,7 +832,7 @@ class asio_context : public request_context, public std::enable_shared_from_this
811832
{
812833
// Replace the connection. This causes old connection object to go out of scope.
813834
auto client = std::static_pointer_cast<asio_client>(m_http_client);
814-
m_connection = client->m_pool.obtain();
835+
m_connection = client->m_pool->obtain();
815836

816837
auto endpoint = *endpoints;
817838
m_connection->async_connect(endpoint, boost::bind(&asio_context::handle_connect, shared_from_this(), boost::asio::placeholders::error, ++endpoints));

0 commit comments

Comments
 (0)