Skip to content

Commit de6f56a

Browse files
authored
Merge pull request #310 from MikhailBurdukov/multiple_endpoints
Multiple endpoints for connection.
2 parents 45680f2 + 87804a0 commit de6f56a

File tree

10 files changed

+366
-35
lines changed

10 files changed

+366
-35
lines changed

clickhouse/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ SET ( clickhouse-cpp-lib-src
55
base/platform.cpp
66
base/socket.cpp
77
base/wire_format.cpp
8+
base/endpoints_iterator.cpp
89

910
columns/array.cpp
1011
columns/column.cpp
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#include "endpoints_iterator.h"
2+
#include <clickhouse/client.h>
3+
4+
namespace clickhouse {
5+
6+
RoundRobinEndpointsIterator::RoundRobinEndpointsIterator(const std::vector<Endpoint>& _endpoints)
7+
: endpoints (_endpoints)
8+
, current_index (endpoints.size() - 1ull)
9+
{
10+
}
11+
12+
Endpoint RoundRobinEndpointsIterator::Next()
13+
{
14+
current_index = (current_index + 1ull) % endpoints.size();
15+
return endpoints[current_index];
16+
}
17+
18+
RoundRobinEndpointsIterator::~RoundRobinEndpointsIterator() = default;
19+
20+
}

clickhouse/base/endpoints_iterator.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#pragma once
2+
3+
#include "clickhouse/client.h"
4+
#include <vector>
5+
6+
namespace clickhouse {
7+
8+
struct ClientOptions;
9+
10+
/**
11+
* Base class for iterating through endpoints.
12+
*/
13+
class EndpointsIteratorBase
14+
{
15+
public:
16+
virtual ~EndpointsIteratorBase() = default;
17+
18+
virtual Endpoint Next() = 0;
19+
};
20+
21+
class RoundRobinEndpointsIterator : public EndpointsIteratorBase
22+
{
23+
public:
24+
explicit RoundRobinEndpointsIterator(const std::vector<Endpoint>& opts);
25+
Endpoint Next() override;
26+
27+
~RoundRobinEndpointsIterator() override;
28+
29+
private:
30+
const std::vector<Endpoint>& endpoints;
31+
size_t current_index;
32+
};
33+
34+
}

clickhouse/base/socket.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,9 +390,9 @@ std::unique_ptr<OutputStream> Socket::makeOutputStream() const {
390390

391391
NonSecureSocketFactory::~NonSecureSocketFactory() {}
392392

393-
std::unique_ptr<SocketBase> NonSecureSocketFactory::connect(const ClientOptions &opts) {
394-
const auto address = NetworkAddress(opts.host, std::to_string(opts.port));
393+
std::unique_ptr<SocketBase> NonSecureSocketFactory::connect(const ClientOptions &opts, const Endpoint& endpoint) {
395394

395+
const auto address = NetworkAddress(endpoint.host, std::to_string(endpoint.port));
396396
auto socket = doConnect(address, opts);
397397
setSocketOptions(*socket, opts);
398398

clickhouse/base/socket.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "platform.h"
44
#include "input.h"
55
#include "output.h"
6+
#include "endpoints_iterator.h"
67

78
#include <cstddef>
89
#include <string>
@@ -88,7 +89,7 @@ class SocketFactory {
8889

8990
// TODO: move connection-related options to ConnectionOptions structure.
9091

91-
virtual std::unique_ptr<SocketBase> connect(const ClientOptions& opts) = 0;
92+
virtual std::unique_ptr<SocketBase> connect(const ClientOptions& opts, const Endpoint& endpoint) = 0;
9293

9394
virtual void sleepFor(const std::chrono::milliseconds& duration);
9495
};
@@ -135,7 +136,7 @@ class NonSecureSocketFactory : public SocketFactory {
135136
public:
136137
~NonSecureSocketFactory() override;
137138

138-
std::unique_ptr<SocketBase> connect(const ClientOptions& opts) override;
139+
std::unique_ptr<SocketBase> connect(const ClientOptions& opts, const Endpoint& endpoint) override;
139140

140141
protected:
141142
virtual std::unique_ptr<Socket> doConnect(const NetworkAddress& address, const ClientOptions& opts);

clickhouse/client.cpp

Lines changed: 128 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,12 @@ struct ClientInfo {
6565

6666
std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) {
6767
os << "Client(" << opt.user << '@' << opt.host << ":" << opt.port
68-
<< " ping_before_query:" << opt.ping_before_query
68+
<< "Endpoints :";
69+
for (size_t i = 0; i < opt.endpoints.size(); i++)
70+
os << opt.user << '@' << opt.endpoints[i].host << ":" << opt.endpoints[i].port
71+
<< ((i == opt.endpoints.size() - 1) ? "" : ", ");
72+
73+
os << " ping_before_query:" << opt.ping_before_query
6974
<< " send_retries:" << opt.send_retries
7075
<< " retry_timeout:" << opt.retry_timeout.count()
7176
<< " compression_method:"
@@ -111,6 +116,15 @@ std::unique_ptr<SocketFactory> GetSocketFactory(const ClientOptions& opts) {
111116
return std::make_unique<NonSecureSocketFactory>();
112117
}
113118

119+
std::unique_ptr<EndpointsIteratorBase> GetEndpointsIterator(const ClientOptions& opts) {
120+
if (opts.endpoints.empty())
121+
{
122+
throw ValidationError("The list of endpoints is empty");
123+
}
124+
125+
return std::make_unique<RoundRobinEndpointsIterator>(opts.endpoints);
126+
}
127+
114128
}
115129

116130
class Client::Impl {
@@ -130,8 +144,12 @@ class Client::Impl {
130144

131145
void ResetConnection();
132146

147+
void ResetConnectionEndpoint();
148+
133149
const ServerInfo& GetServerInfo() const;
134150

151+
const std::optional<Endpoint>& GetCurrentEndpoint() const;
152+
135153
private:
136154
bool Handshake();
137155

@@ -155,13 +173,22 @@ class Client::Impl {
155173

156174
void WriteBlock(const Block& block, OutputStream& output);
157175

176+
void CreateConnection();
177+
158178
void InitializeStreams(std::unique_ptr<SocketBase>&& socket);
159179

180+
inline size_t GetConnectionAttempts() const
181+
{
182+
return options_.endpoints.size() * options_.send_retries;
183+
}
184+
160185
private:
161186
/// In case of network errors tries to reconnect to server and
162187
/// call fuc several times.
163188
void RetryGuard(std::function<void()> func);
164189

190+
void RetryConnectToTheEndpoint(std::function<void()>& func);
191+
165192
private:
166193
class EnsureNull {
167194
public:
@@ -194,32 +221,34 @@ class Client::Impl {
194221
std::unique_ptr<InputStream> input_;
195222
std::unique_ptr<OutputStream> output_;
196223
std::unique_ptr<SocketBase> socket_;
224+
std::unique_ptr<EndpointsIteratorBase> endpoints_iterator;
225+
226+
std::optional<Endpoint> current_endpoint_;
197227

198228
ServerInfo server_info_;
199229
};
200230

231+
ClientOptions modifyClientOptions(ClientOptions opts)
232+
{
233+
if (opts.host.empty())
234+
return opts;
235+
236+
Endpoint default_endpoint({opts.host, opts.port});
237+
opts.endpoints.emplace(opts.endpoints.begin(), default_endpoint);
238+
return opts;
239+
}
201240

202241
Client::Impl::Impl(const ClientOptions& opts)
203242
: Impl(opts, GetSocketFactory(opts)) {}
204243

205244
Client::Impl::Impl(const ClientOptions& opts,
206245
std::unique_ptr<SocketFactory> socket_factory)
207-
: options_(opts)
246+
: options_(modifyClientOptions(opts))
208247
, events_(nullptr)
209248
, socket_factory_(std::move(socket_factory))
249+
, endpoints_iterator(GetEndpointsIterator(options_))
210250
{
211-
for (unsigned int i = 0; ; ) {
212-
try {
213-
ResetConnection();
214-
break;
215-
} catch (const std::system_error&) {
216-
if (++i > options_.send_retries) {
217-
throw;
218-
}
219-
220-
socket_factory_->sleepFor(options_.retry_timeout);
221-
}
222-
}
251+
CreateConnection();
223252

224253
if (options_.compression_method != CompressionMethod::None) {
225254
compression_ = CompressionState::Enable;
@@ -329,17 +358,57 @@ void Client::Impl::Ping() {
329358
}
330359

331360
void Client::Impl::ResetConnection() {
332-
InitializeStreams(socket_factory_->connect(options_));
361+
InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value()));
333362

334363
if (!Handshake()) {
335364
throw ProtocolError("fail to connect to " + options_.host);
336365
}
337366
}
338367

368+
void Client::Impl::ResetConnectionEndpoint() {
369+
current_endpoint_.reset();
370+
for (size_t i = 0; i < options_.endpoints.size();)
371+
{
372+
try
373+
{
374+
current_endpoint_ = endpoints_iterator->Next();
375+
ResetConnection();
376+
return;
377+
} catch (const std::system_error&) {
378+
if (++i == options_.endpoints.size())
379+
{
380+
current_endpoint_.reset();
381+
throw;
382+
}
383+
}
384+
}
385+
}
386+
387+
void Client::Impl::CreateConnection() {
388+
for (size_t i = 0; i < options_.send_retries;)
389+
{
390+
try
391+
{
392+
ResetConnectionEndpoint();
393+
return;
394+
} catch (const std::system_error&) {
395+
if (++i == options_.send_retries)
396+
{
397+
throw;
398+
}
399+
}
400+
}
401+
}
402+
339403
const ServerInfo& Client::Impl::GetServerInfo() const {
340404
return server_info_;
341405
}
342406

407+
408+
const std::optional<Endpoint>& Client::Impl::GetCurrentEndpoint() const {
409+
return current_endpoint_;
410+
}
411+
343412
bool Client::Impl::Handshake() {
344413
if (!SendHello()) {
345414
return false;
@@ -859,21 +928,45 @@ bool Client::Impl::ReceiveHello() {
859928
}
860929

861930
void Client::Impl::RetryGuard(std::function<void()> func) {
862-
for (unsigned int i = 0; ; ++i) {
863-
try {
864-
func();
865-
return;
866-
} catch (const std::system_error&) {
867-
bool ok = true;
868931

932+
if (current_endpoint_)
933+
{
934+
for (unsigned int i = 0; ; ++i) {
869935
try {
870-
socket_factory_->sleepFor(options_.retry_timeout);
871-
ResetConnection();
872-
} catch (...) {
873-
ok = false;
936+
func();
937+
return;
938+
} catch (const std::system_error&) {
939+
bool ok = true;
940+
941+
try {
942+
socket_factory_->sleepFor(options_.retry_timeout);
943+
ResetConnection();
944+
} catch (...) {
945+
ok = false;
946+
}
947+
948+
if (!ok && i == options_.send_retries) {
949+
break;
950+
}
874951
}
875-
876-
if (!ok && i == options_.send_retries) {
952+
}
953+
}
954+
// Connectiong with current_endpoint_ are broken.
955+
// Trying to establish with the another one from the list.
956+
size_t connection_attempts_count = GetConnectionAttempts();
957+
for (size_t i = 0; i < connection_attempts_count;)
958+
{
959+
try
960+
{
961+
socket_factory_->sleepFor(options_.retry_timeout);
962+
current_endpoint_ = endpoints_iterator->Next();
963+
ResetConnection();
964+
func();
965+
return;
966+
} catch (const std::system_error&) {
967+
if (++i == connection_attempts_count)
968+
{
969+
current_endpoint_.reset();
877970
throw;
878971
}
879972
}
@@ -936,6 +1029,14 @@ void Client::ResetConnection() {
9361029
impl_->ResetConnection();
9371030
}
9381031

1032+
void Client::ResetConnectionEndpoint() {
1033+
impl_->ResetConnectionEndpoint();
1034+
}
1035+
1036+
const std::optional<Endpoint>& Client::GetCurrentEndpoint() const {
1037+
return impl_->GetCurrentEndpoint();
1038+
}
1039+
9391040
const ServerInfo& Client::GetServerInfo() const {
9401041
return impl_->GetServerInfo();
9411042
}

clickhouse/client.h

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,18 @@ enum class CompressionMethod {
4444
LZ4 = 1,
4545
};
4646

47+
struct Endpoint {
48+
std::string host;
49+
uint16_t port = 9000;
50+
inline bool operator==(const Endpoint& right) const {
51+
return host == right.host && port == right.port;
52+
}
53+
};
54+
55+
enum class EndpointsIterationAlgorithm {
56+
RoundRobin = 0,
57+
};
58+
4759
struct ClientOptions {
4860
// Setter goes first, so it is possible to apply 'deprecated' annotation safely.
4961
#define DECLARE_FIELD(name, type, setter, default_value) \
@@ -56,7 +68,15 @@ struct ClientOptions {
5668
/// Hostname of the server.
5769
DECLARE_FIELD(host, std::string, SetHost, std::string());
5870
/// Service port.
59-
DECLARE_FIELD(port, unsigned int, SetPort, 9000);
71+
DECLARE_FIELD(port, uint16_t, SetPort, 9000);
72+
73+
/** Set endpoints (host+port), only one is used.
74+
* Client tries to connect to those endpoints one by one, on the round-robin basis:
75+
* first default enpoint (set via SetHost() + SetPort()), then each of endpoints, from begin() to end(),
76+
* the first one to establish connection is used for the rest of the session.
77+
* If port isn't specified, default(9000) value will be used.
78+
*/
79+
DECLARE_FIELD(endpoints, std::vector<Endpoint>, SetEndpoints, {});
6080

6181
/// Default database.
6282
DECLARE_FIELD(default_database, std::string, SetDefaultDatabase, "default");
@@ -240,6 +260,12 @@ class Client {
240260

241261
const ServerInfo& GetServerInfo() const;
242262

263+
/// Get current connected endpoint.
264+
/// In case when client is not connected to any endpoint, nullopt will returned.
265+
const std::optional<Endpoint>& GetCurrentEndpoint() const;
266+
267+
// Try to connect to different endpoints one by one only one time. If it doesn't work, throw an exception.
268+
void ResetConnectionEndpoint();
243269
private:
244270
const ClientOptions options_;
245271

0 commit comments

Comments
 (0)