Skip to content

Commit

Permalink
merge to 87fd4ebd9977afb1e1193429dd75c7c82caab204 (apache#202)
Browse files Browse the repository at this point in the history
1. ix bugs in query layer.
2. remove some redundant code in BE
3. support specify multi helper node when starting FE
4. add proc 'cluster_load_statistic' to show load balance situation of Palo
  • Loading branch information
morningman authored Jun 8, 2018
1 parent c4be571 commit 9f7b1ea
Show file tree
Hide file tree
Showing 32 changed files with 849 additions and 634 deletions.
10 changes: 5 additions & 5 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,18 @@ AgentServer::AgentServer(ExecEnv* exec_env,

// clean dpp download dir
_command_executor = new CommandExecutor();
vector<OLAPRootPathStat> root_paths_stat;
_command_executor->get_all_root_path_stat(&root_paths_stat);
for (auto root_path_stat : root_paths_stat) {
vector<RootPathInfo> root_paths_info;
_command_executor->get_all_root_path_info(&root_paths_info);
for (auto root_path_info: root_paths_info) {
try {
string dpp_download_path_str = root_path_stat.root_path + DPP_PREFIX;
string dpp_download_path_str = root_path_info.path + DPP_PREFIX;
boost::filesystem::path dpp_download_path(dpp_download_path_str);
if (boost::filesystem::exists(dpp_download_path)) {
boost::filesystem::remove_all(dpp_download_path);
}
} catch (...) {
OLAP_LOG_WARNING("boost exception when remove dpp download path. [path='%s']",
root_path_stat.root_path.c_str());
root_path_info.path.c_str());
}
}

Expand Down
18 changes: 9 additions & 9 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1541,19 +1541,19 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
}
#endif

vector<OLAPRootPathStat> root_paths_stat;
vector<RootPathInfo> root_paths_info;

worker_pool_this->_command_executor->get_all_root_path_stat(&root_paths_stat);
worker_pool_this->_command_executor->get_all_root_path_info(&root_paths_info);

map<string, TDisk> disks;
for (auto root_path_state : root_paths_stat) {
for (auto root_path_info : root_paths_info) {
TDisk disk;
disk.__set_root_path(root_path_state.root_path);
disk.__set_disk_total_capacity(static_cast<double>(root_path_state.disk_total_capacity));
disk.__set_data_used_capacity(static_cast<double>(root_path_state.data_used_capacity));
disk.__set_disk_available_capacity(static_cast<double>(root_path_state.disk_available_capacity));
disk.__set_used(root_path_state.is_used);
disks[root_path_state.root_path] = disk;
disk.__set_root_path(root_path_info.path);
disk.__set_disk_total_capacity(static_cast<double>(root_path_info.capacity));
disk.__set_data_used_capacity(static_cast<double>(root_path_info.data_used_capacity));
disk.__set_disk_available_capacity(static_cast<double>(root_path_info.available));
disk.__set_used(root_path_info.is_used);
disks[root_path_info.path] = disk;
}
request.__set_disks(disks);

Expand Down
11 changes: 5 additions & 6 deletions be/src/exec/csv_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <thrift/protocol/TDebugProtocol.h>

#include "exec/text_converter.hpp"
#include "exprs/hll_hash_function.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/runtime_state.h"
#include "runtime/row_batch.h"
Expand Down Expand Up @@ -652,19 +653,17 @@ void CsvScanNode::hll_hash(const char* src, int len, std::string* result) {
std::string str(src, len);
if (str != "\\N") {
uint64_t hash = HashUtil::murmur_hash64A(src, len, HashUtil::MURMUR_SEED);
char buf[11];
memset(buf, 0, 11);
char buf[HllHashFunctions::HLL_INIT_EXPLICT_SET_SIZE];
// expliclit set
buf[0] = HLL_DATA_EXPLICIT;
buf[1] = 1;
*((uint64_t*)(buf + 2)) = hash;
*result = std::string(buf, 11);
*result = std::string(buf, sizeof(buf));
} else {
char buf[2];
memset(buf, 0, 2);
char buf[HllHashFunctions::HLL_EMPTY_SET_SIZE];
// empty set
buf[0] = HLL_DATA_EMPTY;
*result = std::string(buf, 2);
*result = std::string(buf, sizeof(buf));
}
}

Expand Down
19 changes: 12 additions & 7 deletions be/src/exprs/hll_hash_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,31 @@ namespace palo {

using palo_udf::BigIntVal;
using palo_udf::StringVal;


const int HllHashFunctions::HLL_INIT_EXPLICT_SET_SIZE = 10;
const int HllHashFunctions::HLL_EMPTY_SET_SIZE = 1;

void HllHashFunctions::init() {
}

StringVal HllHashFunctions::create_string_result(palo_udf::FunctionContext* ctx,
const StringVal& val, const bool is_null) {
std::string result;
StringVal result;
if (is_null) {
// HLL_DATA_EMPTY
result = "0";
char buf[HLL_EMPTY_SET_SIZE];
buf[0] = HLL_DATA_EMPTY;
result = AnyValUtil::from_buffer_temp(ctx, buf, sizeof(buf));
} else {
// HLL_DATA_EXPLHLL_DATA_EXPLICIT
uint64_t hash = HashUtil::murmur_hash64A(val.ptr, val.len, HashUtil::MURMUR_SEED);
char buf[10];
char buf[HLL_INIT_EXPLICT_SET_SIZE];
buf[0] = HLL_DATA_EXPLICIT;
buf[1] = 1;
*((uint64_t*)(buf + 2)) = hash;
result = std::string(buf, 10);
}
return AnyValUtil::from_buffer_temp(ctx, result.c_str(), result.length());
result = AnyValUtil::from_buffer_temp(ctx, buf, sizeof(buf));
}
return result;
}

StringVal HllHashFunctions::hll_hash(palo_udf::FunctionContext* ctx,
Expand Down
3 changes: 3 additions & 0 deletions be/src/exprs/hll_hash_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class HllHashFunctions {
const palo_udf::StringVal& dest_base);
static StringVal create_string_result(palo_udf::FunctionContext* ctx,
const StringVal& str, const bool is_null);

static const int HLL_INIT_EXPLICT_SET_SIZE;
static const int HLL_EMPTY_SET_SIZE;
};
}

Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/command_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -696,18 +696,18 @@ OLAPStatus CommandExecutor::cancel_delete(const TCancelDeleteDataReq& request) {
return res;
}

OLAPStatus CommandExecutor::get_all_root_path_stat(
std::vector<OLAPRootPathStat>* root_paths_stat) {
OLAP_LOG_INFO("begin to process get all root path stat.");
OLAPStatus CommandExecutor::get_all_root_path_info(
std::vector<RootPathInfo>* root_paths_info) {
OLAP_LOG_INFO("begin to process get all root path info.");
OLAPStatus res = OLAP_SUCCESS;

res = OLAPRootPath::get_instance()->get_all_root_path_stat(root_paths_stat);
res = OLAPRootPath::get_instance()->get_all_root_path_info(root_paths_info);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to process get all root path stat. [res=%d]", res);
OLAP_LOG_WARNING("fail to process get all root path info. [res=%d]", res);
return res;
}

OLAP_LOG_INFO("success to process get all root path stat.");
OLAP_LOG_INFO("success to process get all root path info.");
return res;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/command_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class CommandExecutor {
//
// @param root_paths_stat each root path stat including total/used/available capacity
// @return error code
virtual OLAPStatus get_all_root_path_stat(std::vector<OLAPRootPathStat>* root_paths_stat);
virtual OLAPStatus get_all_root_path_info(std::vector<RootPathInfo>* root_paths_info);

private:
// Create initial base and delta version.
Expand Down
40 changes: 20 additions & 20 deletions be/src/olap/olap_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ OLAPStatus OLAPEngine::init() {
}

// 初始化CE调度器
vector<OLAPRootPathStat> all_root_paths_stat;
OLAPRootPath::get_instance()->get_all_disk_stat(&all_root_paths_stat);
_cumulative_compaction_disk_stat.reserve(all_root_paths_stat.size());
for (uint32_t i = 0; i < all_root_paths_stat.size(); i++) {
const OLAPRootPathStat& stat = all_root_paths_stat[i];
_cumulative_compaction_disk_stat.emplace_back(stat.root_path, i, stat.is_used);
_disk_id_map[stat.root_path] = i;
vector<RootPathInfo> all_root_paths_info;
OLAPRootPath::get_instance()->get_all_root_path_info(&all_root_paths_info);
_cumulative_compaction_disk_stat.reserve(all_root_paths_info.size());
for (uint32_t i = 0; i < all_root_paths_info.size(); i++) {
const RootPathInfo& info = all_root_paths_info[i];
_cumulative_compaction_disk_stat.emplace_back(info.path, i, info.is_used);
_disk_id_map[info.path] = i;
}
int32_t cumulative_compaction_num_threads = config::cumulative_compaction_num_threads;
int32_t base_compaction_num_threads = config::base_compaction_num_threads;
Expand Down Expand Up @@ -1044,11 +1044,11 @@ void OLAPEngine::start_cumulative_priority() {

// determine whether to select candidate or not
bool is_select = false;
vector<OLAPRootPathStat> all_root_paths_stat;
OLAPRootPath::get_instance()->get_all_disk_stat(&all_root_paths_stat);
for (uint32_t i = 0; i < all_root_paths_stat.size(); i++) {
uint32_t disk_id = _disk_id_map[all_root_paths_stat[i].root_path];
_cumulative_compaction_disk_stat[disk_id].is_used = all_root_paths_stat[i].is_used;
vector<RootPathInfo> all_root_paths_info;
OLAPRootPath::get_instance()->get_all_root_path_info(&all_root_paths_info);
for (uint32_t i = 0; i < all_root_paths_info.size(); i++) {
uint32_t disk_id = _disk_id_map[all_root_paths_info[i].path];
_cumulative_compaction_disk_stat[disk_id].is_used = all_root_paths_info[i].is_used;
}

for (auto& disk : _cumulative_compaction_disk_stat) {
Expand Down Expand Up @@ -1126,8 +1126,8 @@ OLAPStatus OLAPEngine::start_trash_sweep(double* usage) {
const uint32_t snapshot_expire = config::snapshot_expire_time_sec;
const uint32_t trash_expire = config::trash_file_expire_time_sec;
const double guard_space = config::disk_capacity_insufficient_percentage / 100.0;
std::vector<OLAPRootPathStat> disks_stat;
res = OLAPRootPath::get_instance()->get_all_disk_stat(&disks_stat);
std::vector<RootPathInfo> root_paths_info;
res = OLAPRootPath::get_instance()->get_all_root_path_info(&root_paths_info);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("failed to get root path stat info when sweep trash.");
return res;
Expand All @@ -1141,25 +1141,25 @@ OLAPStatus OLAPEngine::start_trash_sweep(double* usage) {
}
const time_t local_now = mktime(&local_tm_now); //得到当地日历时间

for (OLAPRootPathStat& stat : disks_stat) {
if (!stat.is_used) {
for (RootPathInfo& info : root_paths_info) {
if (!info.is_used) {
continue;
}

double curr_usage = (stat.disk_total_capacity - stat.disk_available_capacity)
/ (double) stat.disk_total_capacity;
double curr_usage = info.data_used_capacity
/ (double) info.capacity;
*usage = *usage > curr_usage ? *usage : curr_usage;

OLAPStatus curr_res = OLAP_SUCCESS;
string snapshot_path = stat.root_path + SNAPSHOT_PREFIX;
string snapshot_path = info.path + SNAPSHOT_PREFIX;
curr_res = _do_sweep(snapshot_path, local_now, snapshot_expire);
if (curr_res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("failed to sweep snapshot. [path=%s, err_code=%d]",
snapshot_path.c_str(), curr_res);
res = curr_res;
}

string trash_path = stat.root_path + TRASH_PREFIX;
string trash_path = info.path + TRASH_PREFIX;
curr_res = _do_sweep(trash_path, local_now,
curr_usage > guard_space ? 0 : trash_expire);
if (curr_res != OLAP_SUCCESS) {
Expand Down
92 changes: 24 additions & 68 deletions be/src/olap/olap_rootpath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ OLAPStatus OLAPRootPath::init() {

for (size_t i = 0; i < root_path_vec.size(); ++i) {
RootPathInfo root_path_info;
root_path_info.path = root_path_vec[i];
root_path_info.is_used = true;
root_path_info.capacity = capacity_vec[i];
res = _update_root_path_info(root_path_vec[i], &root_path_info);
Expand Down Expand Up @@ -244,56 +245,27 @@ void OLAPRootPath::get_all_available_root_path(RootPathVec* all_available_root_p
_mutex.unlock();
}

OLAPStatus OLAPRootPath::get_all_disk_stat(vector<OLAPRootPathStat>* disks_stat) {
OLAPStatus OLAPRootPath::get_all_root_path_info(vector<RootPathInfo>* root_paths_info) {
OLAPStatus res = OLAP_SUCCESS;
disks_stat->clear();
root_paths_info->clear();

_mutex.lock();
for (RootPathMap::iterator it = _root_paths.begin(); it != _root_paths.end(); ++it) {
OLAPRootPathStat stat;
stat.root_path = it->first;
stat.is_used = it->second.is_used;

disks_stat->push_back(stat);
}
_mutex.unlock();

for (auto& stat : *disks_stat) {
if (stat.is_used) {
_get_disk_capacity(stat.root_path, &stat.disk_total_capacity, &stat.disk_available_capacity);
} else {
stat.disk_total_capacity = 0;
stat.disk_available_capacity = 0;
stat.data_used_capacity = 0;
}
}

return res;
}

OLAPStatus OLAPRootPath::get_all_root_path_stat(vector<OLAPRootPathStat>* root_paths_stat) {
OLAPStatus res = OLAP_SUCCESS;
root_paths_stat->clear();

_mutex.lock();
for (RootPathMap::iterator it = _root_paths.begin(); it != _root_paths.end(); ++it) {
OLAPRootPathStat stat;
stat.root_path = it->first;
stat.is_used = it->second.is_used;
stat.disk_total_capacity = it->second.capacity;

root_paths_stat->push_back(stat);
RootPathInfo info;
info.path = it->first;
info.is_used = it->second.is_used;
info.capacity = it->second.capacity;
root_paths_info->push_back(info);
}
_mutex.unlock();

for (auto& stat : *root_paths_stat) {
if (stat.is_used) {
_get_disk_capacity(stat.root_path, &stat.disk_total_capacity, &stat.disk_available_capacity);
_get_root_path_capacity(stat.root_path, &stat.data_used_capacity);
for (auto& info: *root_paths_info) {
if (info.is_used) {
_get_root_path_capacity(info.path, &info.data_used_capacity, &info.available);
} else {
stat.disk_total_capacity = 0;
stat.data_used_capacity = 0;
stat.disk_available_capacity = 0;
info.capacity = 1;
info.data_used_capacity = 0;
info.available = 0;
}
}

Expand Down Expand Up @@ -340,6 +312,7 @@ OLAPStatus OLAPRootPath::reload_root_paths(const char* root_paths) {
RootPathMap::iterator iter_root_path = _root_paths.find(root_path_vec[i]);
if (iter_root_path == _root_paths.end()) {
RootPathInfo root_path_info;
root_path_info.path = root_path_vec[i];
root_path_info.is_used = true;
root_path_info.capacity = capacity_vec[i];
root_path_to_be_loaded.push_back(root_path_vec[i]);
Expand Down Expand Up @@ -716,7 +689,8 @@ OLAPStatus OLAPRootPath::parse_root_paths_from_string(

OLAPStatus OLAPRootPath::_get_root_path_capacity(
const string& root_path,
int64_t* data_used) {
int64_t* data_used,
int64_t* disk_available) {
OLAPStatus res = OLAP_SUCCESS;
int64_t used = 0;

Expand All @@ -729,28 +703,10 @@ OLAPStatus OLAPRootPath::_get_root_path_capacity(
}
}
*data_used = used;
} catch (boost::filesystem::filesystem_error& e) {
OLAP_LOG_WARNING("get space info failed. [path: %s, erro:%s]", root_path.c_str(), e.what());
return OLAP_ERR_STL_ERROR;
}

return res;
}

OLAPStatus OLAPRootPath::_get_disk_capacity(
const string& root_path,
int64_t* capacity,
int64_t* available) {
OLAPStatus res = OLAP_SUCCESS;

*capacity = 0;
*available = 0;

try {
boost::filesystem::path path_name(root_path);
boost::filesystem::space_info path_info = boost::filesystem::space(path_name);
*capacity = path_info.capacity;
*available = path_info.available;
*disk_available = path_info.available;
} catch (boost::filesystem::filesystem_error& e) {
OLAP_LOG_WARNING("get space info failed. [path: %s, erro:%s]", root_path.c_str(), e.what());
return OLAP_ERR_STL_ERROR;
Expand Down Expand Up @@ -1137,16 +1093,16 @@ OLAPStatus OLAPRootPath::_get_cluster_id_path_vec(
vector<string>* cluster_id_path_vec) {
OLAPStatus res = OLAP_SUCCESS;

vector<OLAPRootPathStat> root_path_stat_vec;
res = get_all_root_path_stat(&root_path_stat_vec);
vector<RootPathInfo> root_path_info_vec;
res = get_all_root_path_info(&root_path_info_vec);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to get root path stat info. [res=%d]", res);
OLAP_LOG_WARNING("fail to get root path info. [res=%d]", res);
return res;
}

for (const auto& stat : root_path_stat_vec) {
if (stat.is_used) {
cluster_id_path_vec->push_back(stat.root_path + CLUSTER_ID_PREFIX);
for (const auto& info: root_path_info_vec) {
if (info.is_used) {
cluster_id_path_vec->push_back(info.path + CLUSTER_ID_PREFIX);
}
}

Expand Down
Loading

0 comments on commit 9f7b1ea

Please sign in to comment.