Skip to content

Commit

Permalink
add feature and fix bugs (apache#148)
Browse files Browse the repository at this point in the history
Add new features:
1. plugins of Ambari and k8s deploy
2. specified config 'priority_network' to solve some ip problems

Fix bugs:
fix bugs that rebalance does not work in some case.
fix count(*) from union stmt bug
fix some union stmt bugs
fix bugs when try to schema change a clone replica
  • Loading branch information
morningman authored Nov 30, 2017
1 parent 8909f53 commit 585c21f
Show file tree
Hide file tree
Showing 68 changed files with 2,798 additions and 979 deletions.
7 changes: 6 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ namespace config {
CONF_Int32(be_port, "9060");
CONF_Int32(be_rpc_port, "10060");

// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
// this is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24
// If no ip match this rule, will choose one randomly.
CONF_String(priority_networks, "")

////
//// tcmalloc gc parameter
////
Expand Down Expand Up @@ -314,7 +320,6 @@ namespace config {

// to forward compatibility, will be removed later
CONF_Bool(enable_token_check, "true");

} // namespace config

} // namespace palo
Expand Down
8 changes: 4 additions & 4 deletions be/src/runtime/export_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,22 @@ Status ExportSink::open(RuntimeState* state) {
}

Status ExportSink::send(RuntimeState* state, RowBatch* batch) {
LOG(ERROR) << "debug: export_sink send batch: " << print_batch(batch);
VLOG_ROW << "debug: export_sink send batch: " << print_batch(batch);
SCOPED_TIMER(_profile->total_time_counter());
int num_rows = batch->num_rows();
std::stringstream ss;
for (int i = 0; i < num_rows; ++i) {
ss.str("");
RETURN_IF_ERROR(gen_row_buffer(batch->get_row(i), &ss));
LOG(ERROR) << "debug: export_sink send row: " << ss.str();
VLOG_ROW << "debug: export_sink send row: " << ss.str();
const std::string& buf = ss.str();
size_t written_len = 0;

SCOPED_TIMER(_write_timer);
// TODO(lingbin): for broker writer, we should not send rpc each row.
_file_writer->write(reinterpret_cast<const uint8_t*>(buf.c_str()),
RETURN_IF_ERROR(_file_writer->write(reinterpret_cast<const uint8_t*>(buf.c_str()),
buf.size(),
&written_len);
&written_len));
COUNTER_UPDATE(_bytes_written_counter, buf.size());
}
COUNTER_UPDATE(_rows_written_counter, num_rows);
Expand Down
65 changes: 58 additions & 7 deletions be/src/service/backend_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,97 @@

#include "service/backend_options.h"

#include <algorithm>

#include <boost/algorithm/string.hpp>

#include "common/logging.h"
#include "common/status.h"
#include "util/network_util.h"
#include "util/cidr.h"

namespace palo {

std::string BackendOptions::_localhost;
static const std::string PRIORITY_CIDR_SEPARATOR = ";";

void BackendOptions::init() {
std::string BackendOptions::_s_localhost;
std::vector<CIDR> BackendOptions::_s_priority_cidrs;

bool BackendOptions::init() {
if (!analyze_priority_cidrs()) {
return false;
}
std::vector<InetAddress> hosts;
Status status = get_hosts_v4(&hosts);

if (!status.ok()) {
LOG(FATAL) << status.get_error_msg();
return false;
}

if (hosts.empty()) {
LOG(FATAL) << "failed to get host";
return false;
}

std::string loopback;
std::vector<InetAddress>::iterator addr_it = hosts.begin();
for (; addr_it != hosts.end(); ++addr_it) {
if ((*addr_it).is_address_v4()) {
VLOG(2) << "check ip=" << addr_it->get_host_address_v4();
if ((*addr_it).is_loopback_v4()) {
loopback = (*addr_it).get_host_address_v4();
loopback = addr_it->get_host_address_v4();
} else if (!_s_priority_cidrs.empty()) {
if (is_in_prior_network(addr_it->get_host_address_v4())) {
_s_localhost = addr_it->get_host_address_v4();
break;
}
} else {
_localhost = (*addr_it).get_host_address_v4();
_s_localhost = addr_it->get_host_address_v4();
break;
}
}
}

if (_localhost.empty()) {
_localhost = loopback;
if (_s_localhost.empty()) {
LOG(INFO) << "fail to find one valid non-loopback address, use loopback address.";
_s_localhost = loopback;
}
LOG(INFO) << "local host ip=" << _s_localhost;
return true;
}

std::string BackendOptions::get_localhost() {
return _localhost;
return _s_localhost;
}

bool BackendOptions::analyze_priority_cidrs() {
if (config::priority_networks == "") {
return true;
}
LOG(INFO) << "priority cidrs in conf: " << config::priority_networks;

std::vector<std::string> cidr_strs;
boost::split(cidr_strs, config::priority_networks, boost::is_any_of(PRIORITY_CIDR_SEPARATOR));

for (auto& cidr_str : cidr_strs) {
CIDR cidr;
if (!cidr.reset(cidr_str)) {
LOG(FATAL) << "wrong cidr format. cidr_str=" << cidr_str;
return false;
}
_s_priority_cidrs.push_back(cidr);
}
return true;
}

bool BackendOptions::is_in_prior_network(const std::string& ip) {
for (auto& cidr : _s_priority_cidrs) {
if (cidr.contains(ip)) {
return true;
}
}
return false;
}

}
12 changes: 10 additions & 2 deletions be/src/service/backend_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,24 @@
#define BDG_PALO_BE_SERVICE_BACKEND_OPTIONS_H

#include <string>
#include <vector>
#include <gutil/macros.h>

namespace palo {

class CIDR;

class BackendOptions {
public:
static void init();
static bool init();
static std::string get_localhost();

private:
static std::string _localhost;
static bool analyze_priority_cidrs();
static bool is_in_prior_network(const std::string& ip);

static std::string _s_localhost;
static std::vector<CIDR> _s_priority_cidrs;

DISALLOW_COPY_AND_ASSIGN(BackendOptions);
};
Expand Down
10 changes: 6 additions & 4 deletions be/src/service/palo_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ int main(int argc, char** argv) {
exit(-1);
}

string conffile = string(getenv("PALO_HOME")) + "/conf/be.conf";
string conffile = string(getenv("PALO_HOME")) + "/conf/be.conf";
if (!palo::config::init(conffile.c_str(), false)) {
fprintf(stderr, "error read config file. \n");
return -1;
Expand All @@ -104,7 +104,9 @@ int main(int argc, char** argv) {
palo::init_daemon(argc, argv);

palo::ResourceTls::init();
palo::BackendOptions::init();
if (!palo::BackendOptions::init()) {
exit(-1);
}

// initialize storage
if (0 != palo::olap_main(argc, argv)) {
Expand All @@ -119,8 +121,8 @@ int main(int argc, char** argv) {
palo::ThriftServer* be_server = nullptr;

EXIT_IF_ERROR(palo::BackendService::create_service(
&exec_env,
palo::config::be_port,
&exec_env,
palo::config::be_port,
&be_server));
Status status = be_server->start();
if (!status.ok()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ add_library(Util STATIC
load_error_hub.cpp
mysql_load_error_hub.cpp
null_load_error_hub.cpp
cidr.cpp
)

#ADD_BE_TEST(integer-array-test)
Expand Down
102 changes: 102 additions & 0 deletions be/src/util/cidr.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "util/cidr.h"

#include <arpa/inet.h>

#include <boost/algorithm/string.hpp>

#include "common/logging.h"

namespace palo {

CIDR::CIDR() : _address(0), _netmask(0xffffffff) {
}

void CIDR::reset() {
_address = 0;
_netmask = 0xffffffff;
}

bool CIDR::reset(const std::string& cidr_str) {
reset();

// check if have mask
std::string cidr_format_str = cidr_str;
int32_t have_mask = cidr_str.find("/");
if (have_mask == -1) {
cidr_format_str.assign(cidr_str + "/32");
}
VLOG(2) << "cidr format str: " << cidr_format_str;

std::vector<std::string> cidr_items;
boost::split(cidr_items, cidr_format_str, boost::is_any_of("/"));
if (cidr_items.size() != 2) {
LOG(ERROR) << "wrong CIDR format. network=" << cidr_str;
return false;
}

if (cidr_items[1].empty()) {
LOG(ERROR) << "wrong CIDR mask format. network=" << cidr_str;
return false;
}

char* endptr = nullptr;
int32_t mask_length = strtol(cidr_items[1].c_str(), &endptr, 10);
if (errno != 0 || mask_length <= 0 || mask_length > 32) {
LOG(ERROR) << "wrong CIDR mask format. network=" << cidr_str;
return false;
}

uint32_t address = 0;
if (!ip_to_int(cidr_items[0], &address)) {
LOG(ERROR) << "wrong CIDR IP value. network=" << cidr_str;
return false;
}
_address = address;

_netmask = 0xffffffff;
_netmask = _netmask << (32 - mask_length);
return true;
}

bool CIDR::ip_to_int(const std::string& ip_str, uint32_t* value) {
struct in_addr addr;
int flag = inet_aton(ip_str.c_str(), &addr);
if (flag == 0) {
return false;
}
*value = ntohl(addr.s_addr);
return true;
}

bool CIDR::contains(uint32_t ip_int) {
if ((_address & _netmask) == (ip_int & _netmask)) {
return true;
}
return false;
}

bool CIDR::contains(const std::string& ip) {
uint32_t ip_int = 0;
if (!ip_to_int(ip, &ip_int)) {
return false;
}

return contains(ip_int);
}

} // end namespace palo
41 changes: 41 additions & 0 deletions be/src/util/cidr.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef BDG_PALO_BE_SRC_COMMON_UTIL_CIDR_H
#define BDG_PALO_BE_SRC_COMMON_UTIL_CIDR_H

#include <string>

namespace palo {

// Classless Inter-Domain Routing
class CIDR {
public:
CIDR();
void reset();
bool reset(const std::string& cidr_str);
bool contains(const std::string& ip);

private:
bool ip_to_int(const std::string& ip_str, uint32_t* value);
bool contains(uint32_t ip_int);

uint32_t _address;
uint32_t _netmask;
};

} // end namespace palo

#endif // BDG_PALO_BE_SRC_COMMON_UTIL_CIDR_H
1 change: 0 additions & 1 deletion be/src/util/thrift_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

namespace palo {


// Helper class that starts a server in a separate thread, and handles
// the inter-thread communication to monitor whether it started
// correctly.
Expand Down
1 change: 1 addition & 0 deletions be/test/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ ADD_BE_TEST(count_down_latch_test)
ADD_BE_TEST(lru_cache_util_test)
ADD_BE_TEST(filesystem_util_test)
ADD_BE_TEST(internal_queue_test)
ADD_BE_TEST(cidr_test)
Loading

0 comments on commit 585c21f

Please sign in to comment.