Skip to content

Commit

Permalink
optimize get own sequence
Browse files Browse the repository at this point in the history
  • Loading branch information
gaodunqiao committed Jan 6, 2017
1 parent ee59135 commit 8f38d90
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 149 deletions.
15 changes: 7 additions & 8 deletions monitor/include/monitor_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ const std::string PIDFILE = "monitor_pid";
const std::string CMDFILE = "tmp/cmd";

//return status
enum ERR_CODE {
MONITOR_OK = 0,
MONITOR_ERR_OTHER,
MONITOR_NODE_NOT_EXIST,
MONITOR_ERR_FAILED_OPEN_FILE,
MONITOR_ERR_MEM,
MONITOR_ERR_PARAM,
MONITOR_ERR_ZOO_FAILED
enum RetCode {
kSuccess = 0,
kOtherError,
kNotExist,
kOpenFileFailed,
kParamError,
kZkFailed
};

const int MONITOR_MAX_VALUE_SIZE = 1048577;
Expand Down
1 change: 0 additions & 1 deletion monitor/include/monitor_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,5 @@ class ServiceListener {
void GetAllIp();
int AddChildren(const std::string &service_father, struct String_vector &children);
int LoadService(std::string path, std::string service_father, std::string ip_port, vector<int>& );
int GetIpNum(const std::string& service_father);
};
#endif
3 changes: 1 addition & 2 deletions monitor/include/monitor_load_balance.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ class LoadBalance {
// Key is md5 and value is service_father
std::map<std::string, std::string> md5_to_service_father_;
slash::Mutex md5_to_service_father_lock_;
std::set<std::string> monitors_;

int RegisterMonitor(const std::string &path);
int GetMd5ToServiceFather();
int GetMonitors();
int GetMonitors(size_t &monitors_size, size_t &rank);
};
#endif
4 changes: 1 addition & 3 deletions monitor/include/monitor_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ struct MonitorOptions {

// Core data. key is the full path of ip_port and value is ServiceItem of this ip_port
std::map<std::string, ServiceItem> service_map;
slash::Mutex service_map_lock;
// Core data
// key is service_father and value is a set of ipPort
std::vector<std::string> service_fathers;
std::vector<std::string> my_service_fathers;
std::map<std::string, std::set<std::string>> service_father_to_ip;
slash::Mutex service_father_to_ip_lock;

// Marked weather there is a thread checking this service father
std::vector<bool> has_thread;
Expand Down
4 changes: 2 additions & 2 deletions monitor/include/monitor_work_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class WorkThread {
MonitorZk::ZkCallBackHandle *cb_handle) {
if (monitor_zk == NULL) {
monitor_zk = new MonitorZk(options, cb_handle);
int ret = MONITOR_ERR_OTHER;
if ((ret = monitor_zk->InitEnv()) != MONITOR_OK) {
int ret = kOtherError;
if ((ret = monitor_zk->InitEnv()) != kSuccess) {
LOG(LOG_ERROR, "Init zookeeper client failed");
}
}
Expand Down
6 changes: 3 additions & 3 deletions monitor/src/monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include "monitor_work_thread.h"

int main(int argc, char** argv) {
int ret = MONITOR_OK;
int ret = kSuccess;
MonitorOptions *options = new MonitorOptions(CONF_PATH);
if ((ret = options->Load()) != 0)
return ret;
Expand All @@ -24,7 +24,7 @@ int main(int argc, char** argv) {
process::need_restart = false;
if (process::IsProcessRunning(MONITOR_PROCESS_NAME)) {
LOG(LOG_ERROR, "Monitor is already running.");
return MONITOR_ERR_OTHER;
return kOtherError;
}
if (options->daemon_mode)
process::Daemonize();
Expand All @@ -36,7 +36,7 @@ int main(int argc, char** argv) {
if (ret > 0)
return child_exit_status;
else if (ret < 0)
return MONITOR_ERR_OTHER;
return kOtherError;
else {
std::ofstream pidstream(PIDFILE);
pidstream << getpid() << endl;
Expand Down
4 changes: 2 additions & 2 deletions monitor/src/monitor_check_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ void *CheckThread::ThreadMain() {
}

void CheckThread::CronHandle() {
int service_father_num = options_->service_fathers.size();
std::string &cur_service_father = options_->service_fathers[service_pos_];
int service_father_num = options_->my_service_fathers.size();
std::string &cur_service_father = options_->my_service_fathers[service_pos_];
LOG(LOG_INFO, "|checkService| pthread id %x, pthread pos %d, current service father %s", \
(unsigned int)this->thread_id(), (int)service_pos_, cur_service_father.c_str());

Expand Down
25 changes: 7 additions & 18 deletions monitor/src/monitor_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ void ServiceListener::LoadAllService() {
options_->service_map.clear();
GetAllIp();

// Here we need locks. Maybe we can remove it
slash::MutexLock l(&options_->service_father_to_ip_lock);
for (auto it1 = options_->service_father_to_ip.begin();
it1 != options_->service_father_to_ip.end();
++it1) {
Expand All @@ -58,7 +56,7 @@ void ServiceListener::GetAllIp() {
std::string service_father = it->first;
struct String_vector children = {0};
// Get all ip_port belong to this service_father
if (monitor_zk_->zk_get_chdnodes(service_father, children) != MONITOR_OK) {
if (monitor_zk_->zk_get_chdnodes(service_father, children) != kSuccess) {
LOG(LOG_ERROR, "get IP:Port failed. service_father:%s", service_father.c_str());
options_->service_father_to_ip[service_father].insert("");
} else {
Expand All @@ -80,8 +78,8 @@ int ServiceListener::AddChildren(const std::string &service_father,

int ServiceListener::LoadService(std::string path, std::string service_father, std::string ip_port, std::vector<int>& st) {
char status = STATUS_UNKNOWN;
int ret = MONITOR_OK;
if ((ret = monitor_zk_->zk_get_service_status(path, status)) != MONITOR_OK) {
int ret = kSuccess;
if ((ret = monitor_zk_->zk_get_service_status(path, status)) != kSuccess) {
LOG(LOG_ERROR, "get service status failed. service:%s", path.c_str());
return ret;
}
Expand All @@ -100,16 +98,7 @@ int ServiceListener::LoadService(std::string path, std::string service_father, s
ServiceItem item(ip, addr, port, service_father, status);
options_->service_map[path] = item;
LOG(LOG_INFO, "load service succeed, service:%s, status:%d", path.c_str(), status);
return MONITOR_OK;
}

int ServiceListener::GetIpNum(const std::string& service_father) {
int ret = 0;
slash::MutexLock l(&options_->service_father_to_ip_lock);
if (options_->service_father_to_ip.find(service_father)
!= options_->service_father_to_ip.end())
ret = options_->service_father_to_ip[service_father].size();
return ret;
return kSuccess;
}

void ServiceListener::BalanceZkHandle::ModifyServiceFatherToIp(const int &op,
Expand All @@ -127,7 +116,7 @@ void ServiceListener::BalanceZkHandle::ModifyServiceFatherToIp(const int &op,
return;

char status = STATUS_UNKNOWN;
if (monitor_zk_->zk_get_service_status(ip_path, status) != MONITOR_OK) return;
if (monitor_zk_->zk_get_service_status(ip_path, status) != kSuccess) return;

struct hostent *ht;
ht = gethostbyname(ip.c_str());
Expand All @@ -152,7 +141,7 @@ void ServiceListener::BalanceZkHandle::ProcessDeleteEvent(const std::string& pat
void ServiceListener::BalanceZkHandle::ProcessChildEvent(const std::string& path) {
// It must be a service father node. Because I do zoo_get_children only in service father node
struct String_vector children = {0};
if (monitor_zk_->zk_get_chdnodes(path, children) == MONITOR_OK) {
if (monitor_zk_->zk_get_chdnodes(path, children) == kSuccess) {
if ((options_->service_father_to_ip.find(path) ==
options_->service_father_to_ip.end()) ||
children.count <= (options_->service_father_to_ip)[path].size()) {
Expand All @@ -171,7 +160,7 @@ void ServiceListener::BalanceZkHandle::ProcessChildEvent(const std::string& path
void ServiceListener::BalanceZkHandle::ProcessChangedEvent(const std::string& path) {
int new_status = STATUS_UNKNOWN;
std::string data;
if (monitor_zk_->zk_get_node(path, data, 1) == MONITOR_OK) {
if (monitor_zk_->zk_get_node(path, data, 1) == kSuccess) {
new_status = atoi(data.c_str());
options_->service_map[path].status = new_status;
}
Expand Down
105 changes: 47 additions & 58 deletions monitor/src/monitor_load_balance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,106 +27,83 @@ LoadBalance::~LoadBalance() {
}

int LoadBalance::Init() {
int ret = MONITOR_OK;
int ret = kSuccess;

// Init zookeeper handler
if ((ret = monitor_zk_->InitEnv()) != MONITOR_OK) return ret;
if ((ret = monitor_zk_->InitEnv()) != kSuccess) return ret;
cb_handle.options_ = options_;
cb_handle.monitor_zk_ = monitor_zk_;
cb_handle.md5_to_service_father_ = &md5_to_service_father_;

// Create md5 list node add the watcher
std::string md5_path = options_->GetNodeList();
if ((ret = monitor_zk_->zk_create_node(md5_path, "", 0)) != MONITOR_OK) {
if ((ret = monitor_zk_->zk_create_node(md5_path, "", 0)) != kSuccess) {
LOG(LOG_ERROR, "create znode %s failed", md5_path.c_str());
return ret;
}

// Create monitor list node add the watcher
std::string monitor_path = options_->GetMonitorList();
if ((ret = monitor_zk_->zk_create_node(monitor_path, "", 0)) != MONITOR_OK) {
if ((ret = monitor_zk_->zk_create_node(monitor_path, "", 0)) != kSuccess) {
LOG(LOG_ERROR, "create znode %s failed", monitor_path.c_str());
return ret;
}

// Monitor register, this function should in LoadBalance
if ((ret = RegisterMonitor(monitor_path + "/monitor_")) != MONITOR_OK) {
if ((ret = RegisterMonitor(monitor_path + "/monitor_")) != kSuccess) {
LOG(LOG_ERROR, "Monitor register failed");
return ret;
}
return MONITOR_OK;
return kSuccess;
}

// Get my_service_fathers
// Get my_service_fathers after registed
int LoadBalance::DoBalance() {
options_->service_father_to_ip.clear();
options_->service_fathers.clear();
int ret = MONITOR_ERR_OTHER;
if ((ret = GetMd5ToServiceFather()) != MONITOR_OK) {
options_->my_service_fathers.clear();
int ret = kOtherError;

// Fill std::map<std::string, std::string> md5_to_service_father_;
if ((ret = GetMd5ToServiceFather()) != kSuccess) {
LOG(LOG_ERROR, "get md5 to service father failed");
/* TODO (gaodq)
* how to deal with this in a better way?
* if the reason of failure is node not exist, we should restart main loop
*/
return ret;
}

if ((ret = GetMonitors()) != MONITOR_OK) {
// Get registed monitors
size_t monitors_size = 0;
size_t rank = 0; // this monitor rank
if ((ret = GetMonitors(monitors_size, rank)) != kSuccess) {
LOG(LOG_ERROR, "get monitors failed");
return ret;
}

// TODO gdq 不需拷贝直接遍历md5_to_servi...
// Save for random seek
std::vector<std::string> md5_node;
{ // MutexLock
slash::MutexLock l(&md5_to_service_father_lock_);
for (auto it = md5_to_service_father_.begin();
it != md5_to_service_father_.end();
++it) {
md5_node.push_back(it->first);
}
}

std::vector<unsigned int> sequence;
for (auto it = monitors_.begin(); it != monitors_.end(); ++it) {
unsigned int tmp = stoi((*it).substr((*it).size() - 10));
sequence.push_back(tmp);
for (auto &it : md5_to_service_father_) {
md5_node.push_back(it.first);
}

sort(sequence.begin(), sequence.end());

std::string monitor = std::string(zk_lock_buf_);
unsigned int my_seq = stoi(monitor.substr(monitor.size() - 10));
size_t rank = 0;
for (; rank < sequence.size() && sequence[rank] != my_seq; ++rank) ;
if (rank == sequence.size()) {
LOG(LOG_INFO, "I'm connect to zk. But the monitor registed is removed. Restart main loop");
options_->need_rebalance = true;
return MONITOR_ERR_OTHER;
}

slash::MutexLock l(&md5_to_service_father_lock_);
std::set<string> dummy_set;
for (size_t i = rank; i < md5_node.size(); i += monitors_.size()) {
for (size_t i = rank; i < md5_node.size(); i += monitors_size) {
std::string my_service_father = md5_to_service_father_[md5_node[i]];
options_->service_father_to_ip[my_service_father] = dummy_set;
options_->service_fathers.push_back(my_service_father);
options_->my_service_fathers.push_back(my_service_father);
LOG(LOG_INFO, "my service father:%s", my_service_father.c_str());
}

options_->need_rebalance = false;
return MONITOR_OK;
return kSuccess;
}

int LoadBalance::RegisterMonitor(const std::string &path) {
int ret = MONITOR_ERR_OTHER;
int ret = kOtherError;
std::string &host_name = options_->monitor_host_name;
for (int i = 0; i < MONITOR_GET_RETRIES; ++i) {
memset(zk_lock_buf_, 0, sizeof(zk_lock_buf_));
//register the monitor.
ret = monitor_zk_->zk_create_node(path, host_name, ZOO_EPHEMERAL | ZOO_SEQUENCE,
zk_lock_buf_, sizeof(zk_lock_buf_));
if (ret == MONITOR_OK) {
if (ret == kSuccess) {
LOG(LOG_INFO, "registerMonitor: %s", zk_lock_buf_);
return ret;
}
Expand All @@ -141,10 +118,10 @@ int LoadBalance::GetMd5ToServiceFather() {
std::string node_list_path = options_->GetNodeList();
struct String_vector md5_node = {0};
std::string service_father;
int ret = MONITOR_OK;
int ret = kSuccess;

// Get all md5 node
if ((ret = monitor_zk_->zk_get_chdnodes(node_list_path, md5_node)) != MONITOR_OK) {
if ((ret = monitor_zk_->zk_get_chdnodes(node_list_path, md5_node)) != kSuccess) {
LOG(LOG_ERROR, "get md5 list failes. path:%s", node_list_path.c_str());
return ret;
}
Expand All @@ -153,11 +130,10 @@ int LoadBalance::GetMd5ToServiceFather() {
std::string md5_node_str = std::string(md5_node.data[i]);
std::string md5_path = node_list_path + "/" + md5_node_str;
//get the value of md5Node which is serviceFather
if (monitor_zk_->zk_get_node(md5_path, service_father, 1) != MONITOR_OK) {
if (monitor_zk_->zk_get_node(md5_path, service_father, 1) != kSuccess) {
LOG(LOG_ERROR, "get value of node:%s failed", md5_path.c_str());
continue;
}
// TODO gdq Need lock ?
md5_to_service_father_[md5_node_str] = service_father;
LOG(LOG_INFO, "md5: %s, service_father: %s", md5_path.c_str(), service_father.c_str());
}
Expand All @@ -166,21 +142,34 @@ int LoadBalance::GetMd5ToServiceFather() {
return ret;
}

int LoadBalance::GetMonitors() {
monitors_.clear();
int LoadBalance::GetMonitors(size_t &monitors_size, size_t &rank) {
std::string monitor_list_path = options_->GetMonitorList();
struct String_vector monitor_nodes = {0};
int ret = MONITOR_OK;
int ret = kSuccess;

if ((ret = monitor_zk_->zk_get_chdnodes(monitor_list_path, monitor_nodes)) != MONITOR_OK) {
if ((ret = monitor_zk_->zk_get_chdnodes(monitor_list_path, monitor_nodes)) != kSuccess) {
LOG(LOG_ERROR, "get monitors failes. path:%s", monitor_list_path.c_str());
return ret;
}

for (int i = 0; i < monitor_nodes.count; ++i) {
monitors_.insert(std::string(monitor_nodes.data[i]));
monitors_size = monitor_nodes.count;

std::string my_monitor = std::string(zk_lock_buf_);
unsigned int my_seq = stoi(my_monitor.substr(my_monitor.size() - 10));

for (size_t i = 0; i < monitors_size; ++i) {
std::string monitor(monitor_nodes.data[i]);
unsigned int tmp = stoi(monitor.substr(monitor.size() - 10));
if (tmp == my_seq) break;
rank++;
}

if (rank == monitors_size) {
LOG(LOG_INFO, "Connect to zk. But the monitor registed is removed. Restart main loop");
options_->need_rebalance = true;
return kOtherError;
}
LOG(LOG_INFO, "There are %d monitors, I am %s", monitors_.size(), zk_lock_buf_);
LOG(LOG_INFO, "There are %d monitors, I am %s, my rank is %llu", monitors_size, zk_lock_buf_, rank);

deallocate_String_vector(&monitor_nodes);
return ret;
Expand Down
Loading

0 comments on commit 8f38d90

Please sign in to comment.