Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix config parser and reformat #337

Merged
merged 2 commits into from
Dec 3, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 63 additions & 98 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,18 @@

namespace CurrentMetrics
{
extern const Metric Revision;
extern const Metric Revision;
}

namespace DB
{

namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int SUPPORT_IS_DISABLED;
extern const int ARGUMENT_OUT_OF_BOUND;
}
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int SUPPORT_IS_DISABLED;
extern const int ARGUMENT_OUT_OF_BOUND;
} // namespace ErrorCodes


static std::string getCanonicalPath(std::string && path)
Expand All @@ -84,10 +84,7 @@ void Server::initialize(Poco::Util::Application & self)
logger().information("starting up");
}

std::string Server::getDefaultCorePath() const
{
return getCanonicalPath(config().getString("path")) + "cores";
}
std::string Server::getDefaultCorePath() const { return getCanonicalPath(config().getString("path")) + "cores"; }

int Server::main(const std::vector<std::string> & /*args*/)
{
Expand All @@ -114,8 +111,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
auto old_configuration = loaded_config.configuration;
ConfigProcessor config_processor(config_path);
loaded_config = config_processor.loadConfigWithZooKeeperIncludes(
main_config_zk_node_cache, /* fallback_to_preprocessed = */ true);
loaded_config = config_processor.loadConfigWithZooKeeperIncludes(main_config_zk_node_cache, /* fallback_to_preprocessed = */ true);
config_processor.savePreprocessedConfig(loaded_config);
config().removeConfiguration(old_configuration.get());
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
Expand All @@ -128,7 +124,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
Poco::trimInPlace(fast_paths);
if (!fast_paths.empty())
{
Poco::StringTokenizer string_tokens(fast_paths, ";");
Poco::StringTokenizer string_tokens(fast_paths, ",");
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++)
{
all_fast_path.emplace_back(getCanonicalPath(std::string(*it)));
Expand All @@ -141,7 +137,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
Poco::trimInPlace(paths);
if (paths.empty())
throw Exception("path configuration parameter is empty");
Poco::StringTokenizer string_tokens(paths, ";");
Poco::StringTokenizer string_tokens(paths, ",");
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++)
{
all_normal_path.emplace_back(getCanonicalPath(std::string(*it)));
Expand Down Expand Up @@ -188,9 +184,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
int rc = setrlimit(RLIMIT_NOFILE, &rlim);
if (rc != 0)
LOG_WARNING(log,
"Cannot set max number of file descriptors to " << rlim.rlim_cur
<< ". Try to specify max_open_files according to your system limits. error: "
<< strerror(errno));
"Cannot set max number of file descriptors to "
<< rlim.rlim_cur << ". Try to specify max_open_files according to your system limits. error: " << strerror(errno));
else
LOG_DEBUG(log, "Set max number of file descriptors to " << rlim.rlim_cur << " (was " << old << ").");
}
Expand Down Expand Up @@ -269,8 +264,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto main_config_reloader = std::make_unique<ConfigReloader>(config_path,
include_from_path,
std::move(main_config_zk_node_cache),
[&](ConfigurationPtr config)
{
[&](ConfigurationPtr config) {
buildLoggers(*config);
global_context->setClustersConfig(config);
global_context->setMacros(std::make_unique<Macros>(*config, "macros"));
Expand Down Expand Up @@ -344,8 +338,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Load raft related configs ahead of loading metadata, as TMT storage relies on TMT context, which needs these configs.
bool need_raft_service = false;
std::vector<std::string> pd_addrs;
std::string learner_key;
std::string learner_value;
const std::string learner_key = "engine";
const std::string learner_value = "tiflash";
std::unordered_set<std::string> ignore_databases{"system"};
std::string kvstore_path = path + "kvstore/";
String flash_server_addr = config().getString("flash.service_addr", "0.0.0.0:3930");
Expand All @@ -357,7 +351,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (config().has("raft.pd_addr"))
{
String pd_service_addrs = config().getString("raft.pd_addr");
Poco::StringTokenizer string_tokens(pd_service_addrs, ";");
Poco::StringTokenizer string_tokens(pd_service_addrs, ",");
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++)
{
pd_addrs.push_back(*it);
Expand All @@ -369,24 +363,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Not found pd addrs.");
}

if (config().has("raft.learner_key"))
{
learner_key = config().getString("raft.learner_key");
}
else
{
learner_key = "zone";
}

if (config().has("raft.learner_value"))
{
learner_value = config().getString("raft.learner_value");
}
else
{
learner_value = "engine";
}

if (config().has("raft.ignore_databases"))
{
String ignore_dbs = config().getString("raft.ignore_databases");
Expand Down Expand Up @@ -420,7 +396,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setCurrentDatabase(default_database);

/// Then, sync schemas with TiDB, and initialize schema sync service.
for (int i = 0; i < 180 ; i++) // retry for 3 mins
for (int i = 0; i < 180; i++) // retry for 3 mins
{
try
{
Expand All @@ -429,7 +405,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
catch (Poco::Exception & e)
{
LOG_ERROR(log, "Bootstrap failed because sync schema error: " << e.displayText() << "\n We will sleep 3 seconds and try again.");
LOG_ERROR(
log, "Bootstrap failed because sync schema error: " << e.displayText() << "\n We will sleep 3 seconds and try again.");
::sleep(1);
}
}
Expand Down Expand Up @@ -531,8 +508,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
listen_try = true;
}

auto make_socket_address = [&](const std::string & host, UInt16 port)
{
auto make_socket_address = [&](const std::string & host, UInt16 port) {
Poco::Net::SocketAddress socket_address;
try
{
Expand All @@ -545,39 +521,39 @@ int Server::main(const std::vector<std::string> & /*args*/)
#if defined(EAI_ADDRFAMILY)
|| code == EAI_ADDRFAMILY
#endif
)
)
{
LOG_ERROR(log,
"Cannot resolve listen_host (" << host << "), error " << e.code() << ": " << e.message() << ". "
"If it is an IPv6 address and your host has disabled IPv6, then consider to "
"specify IPv4 address to listen in <listen_host> element of configuration "
"file. Example: <listen_host>0.0.0.0</listen_host>");
"Cannot resolve listen_host (" << host << "), error " << e.code() << ": " << e.message()
<< ". "
"If it is an IPv6 address and your host has disabled IPv6, then consider to "
"specify IPv4 address to listen in <listen_host> element of configuration "
"file. Example: <listen_host>0.0.0.0</listen_host>");
}

throw;
}
return socket_address;
};

auto socket_bind_listen = [&](auto & socket, const std::string & host, UInt16 port, bool secure = 0)
{
auto address = make_socket_address(host, port);
auto socket_bind_listen = [&](auto & socket, const std::string & host, UInt16 port, bool secure = 0) {
auto address = make_socket_address(host, port);
#if !POCO_CLICKHOUSE_PATCH || POCO_VERSION <= 0x02000000 // TODO: fill correct version
if (secure)
/// Bug in old poco, listen() after bind() with reusePort param will fail because have no implementation in SecureServerSocketImpl
/// https://github.com/pocoproject/poco/pull/2257
socket.bind(address, /* reuseAddress = */ true);
else
if (secure)
/// Bug in old poco, listen() after bind() with reusePort param will fail because have no implementation in SecureServerSocketImpl
/// https://github.com/pocoproject/poco/pull/2257
socket.bind(address, /* reuseAddress = */ true);
else
#endif
#if POCO_VERSION < 0x01080000
socket.bind(address, /* reuseAddress = */ true);
socket.bind(address, /* reuseAddress = */ true);
#else
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ config().getBool("listen_reuse_port", false));
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ config().getBool("listen_reuse_port", false));
#endif

socket.listen(/* backlog = */ config().getUInt("listen_backlog", 64));
socket.listen(/* backlog = */ config().getUInt("listen_backlog", 64));

return address;
return address;
};

for (const auto & listen_host : listen_hosts)
Expand All @@ -592,11 +568,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, config().getInt("http_port"));
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer(
new HTTPHandlerFactory(*this, "HTTPHandler-factory"),
server_pool,
socket,
http_params));
servers.emplace_back(
new Poco::Net::HTTPServer(new HTTPHandlerFactory(*this, "HTTPHandler-factory"), server_pool, socket, http_params));

LOG_INFO(log, "Listening http://" + address.toString());
}
Expand All @@ -611,11 +584,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, config().getInt("https_port"), /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer(
new HTTPHandlerFactory(*this, "HTTPSHandler-factory"),
server_pool,
socket,
http_params));
servers.emplace_back(
new Poco::Net::HTTPServer(new HTTPHandlerFactory(*this, "HTTPSHandler-factory"), server_pool, socket, http_params));

LOG_INFO(log, "Listening https://" + address.toString());
#else
Expand All @@ -632,11 +602,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, config().getInt("tcp_port"));
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(new Poco::Net::TCPServer(
new TCPHandlerFactory(*this),
server_pool,
socket,
new Poco::Net::TCPServerParams));
servers.emplace_back(
new Poco::Net::TCPServer(new TCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));

LOG_INFO(log, "Listening tcp: " + address.toString());
}
Expand All @@ -650,10 +617,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(new Poco::Net::TCPServer(
new TCPHandlerFactory(*this, /* secure= */ true ),
server_pool,
socket,
new Poco::Net::TCPServerParams));
new TCPHandlerFactory(*this, /* secure= */ true), server_pool, socket, new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening tcp_secure: " + address.toString());
#else
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
Expand All @@ -673,29 +637,28 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer(
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
server_pool,
socket,
http_params));
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params));

LOG_INFO(log, "Listening interserver http: " + address.toString());
}
}
catch (const Poco::Net::NetException & e)
{
if (listen_try)
LOG_ERROR(log, "Listen [" << listen_host << "]: " << e.code() << ": " << e.what() << ": " << e.message()
<< " If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
"file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
" Example for disabled IPv4: <listen_host>::</listen_host>");
LOG_ERROR(log,
"Listen [" << listen_host << "]: " << e.code() << ": " << e.what() << ": " << e.message()
<< " If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
"file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
" Example for disabled IPv4: <listen_host>::</listen_host>");
else
throw;
}
}

if (servers.empty())
throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);

for (auto & server : servers)
server->start();
Expand All @@ -706,9 +669,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
std::stringstream message;
message << "Available RAM = " << formatReadableSizeWithBinarySuffix(getMemoryAmount()) << ";"
<< " physical cores = " << getNumberOfPhysicalCPUCores() << ";"
// on ARM processors it can show only enabled at current moment cores
<< " threads = " << std::thread::hardware_concurrency() << ".";
<< " physical cores = " << getNumberOfPhysicalCPUCores()
<< ";"
// on ARM processors it can show only enabled at current moment cores
<< " threads = " << std::thread::hardware_concurrency() << ".";
LOG_INFO(log, message.str());
}

Expand Down Expand Up @@ -748,9 +712,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
}

LOG_DEBUG(
log, "Closed connections." << (current_connections ? " But " + toString(current_connections) + " remains."
" Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>" : ""));
LOG_DEBUG(log,
"Closed connections." << (current_connections ? " But " + toString(current_connections)
+ " remains."
" Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>"
: ""));

main_config_reloader.reset();
users_config_reloader.reset();
Expand Down Expand Up @@ -778,8 +744,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::vector<std::unique_ptr<MetricsTransmitter>> metrics_transmitters;
for (const auto & graphite_key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))
{
metrics_transmitters.emplace_back(std::make_unique<MetricsTransmitter>(
*global_context, async_metrics, graphite_key));
metrics_transmitters.emplace_back(std::make_unique<MetricsTransmitter>(*global_context, async_metrics, graphite_key));
}

SessionCleaner session_cleaner(*global_context);
Expand All @@ -789,7 +754,7 @@ int Server::main(const std::vector<std::string> & /*args*/)

return Application::EXIT_OK;
}
}
} // namespace DB

int mainEntryClickHouseServer(int argc, char ** argv)
{
Expand Down