Skip to content

Commit

Permalink
新增ZKClient
Browse files Browse the repository at this point in the history
Signed-off-by: sylar-yin <564628276@qq.com>
  • Loading branch information
sylar-yin committed Aug 14, 2019
1 parent 31f37f0 commit 7d4020d
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ set(LIB_SRC
sylar/util/hash_util.cc
sylar/worker.cc
sylar/application.cc
sylar/zk_client.cc
)

ragelmaker(sylar/http/http11_parser.rl LIB_SRC ${CMAKE_CURRENT_SOURCE_DIR}/sylar/http)
Expand Down Expand Up @@ -138,6 +139,7 @@ set(LIBS
${OPENSSL_LIBRARIES}
${PROTOBUF_LIBRARIES}
mysqlclient_r
zookeeper_mt
sqlite3
tinyxml2
)
Expand Down Expand Up @@ -184,6 +186,7 @@ sylar_add_executable(test_email "tests/test_email.cc" sylar "${LIBS}")
sylar_add_executable(test_mysql "tests/test_mysql.cc" sylar "${LIBS}")
sylar_add_executable(test_nameserver "tests/test_nameserver.cc" sylar "${LIBS}")
sylar_add_executable(test_bitmap "tests/test_bitmap.cc" sylar "${LIBS}")
sylar_add_executable(test_zkclient "tests/test_zookeeper.cc" sylar "${LIBS}")

set(ORM_SRCS
sylar/orm/table.cc
Expand Down
145 changes: 145 additions & 0 deletions sylar/zk_client.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#include "zk_client.h"

namespace sylar {

//static const int CREATED = ZOO_CREATED_EVENT;
//static const int DELETED = ZOO_DELETED_EVENT;
//static const int CHANGED = ZOO_CHANGED_EVENT;
//static const int CHILD = ZOO_CHILD_EVENT;
//static const int SESSION = ZOO_SESSION_EVENT;
//static const int NOWATCHING = ZOO_NOTWATCHING_EVENT;

const int ZKClient::EventType::CREATED = ZOO_CREATED_EVENT;
const int ZKClient::EventType::DELETED = ZOO_DELETED_EVENT;
const int ZKClient::EventType::CHANGED = ZOO_CHANGED_EVENT;
const int ZKClient::EventType::CHILD = ZOO_CHILD_EVENT;
const int ZKClient::EventType::SESSION = ZOO_SESSION_EVENT;
const int ZKClient::EventType::NOWATCHING = ZOO_NOTWATCHING_EVENT;

const int ZKClient::FlagsType::EPHEMERAL = ZOO_EPHEMERAL;
const int ZKClient::FlagsType::SEQUENCE = ZOO_SEQUENCE;
const int ZKClient::FlagsType::CONTAINER = ZOO_CONTAINER;

const int ZKClient::StateType::EXPIRED_SESSION = ZOO_EXPIRED_SESSION_STATE;
const int ZKClient::StateType::AUTH_FAILED = ZOO_AUTH_FAILED_STATE;
const int ZKClient::StateType::CONNECTING = ZOO_CONNECTING_STATE;
const int ZKClient::StateType::ASSOCIATING = ZOO_ASSOCIATING_STATE;
const int ZKClient::StateType::CONNECTED = ZOO_CONNECTED_STATE;
const int ZKClient::StateType::READONLY = ZOO_READONLY_STATE;
const int ZKClient::StateType::NOTCONNECTED = ZOO_NOTCONNECTED_STATE;


ZKClient::ZKClient()
:m_handle(nullptr)
,m_recvTimeout(0) {
}

ZKClient::~ZKClient() {
if(m_handle) {
close();
}
}

void ZKClient::OnWatcher(zhandle_t *zh, int type, int stat, const char *path,void *watcherCtx) {
ZKClient* client = (ZKClient*)watcherCtx;
client->m_watcherCb(type, stat, path);
}

bool ZKClient::reconnect() {
if(m_handle) {
zookeeper_close(m_handle);
}
m_handle = zookeeper_init2(m_hosts.c_str(), &ZKClient::OnWatcher, m_recvTimeout, nullptr, this, 0, m_logCb);
return m_handle != nullptr;
}

bool ZKClient::init(const std::string& hosts, int recv_timeout, watcher_callback cb, log_callback lcb) {
if(m_handle) {
return true;
}
m_hosts = hosts;
m_recvTimeout = recv_timeout;
m_watcherCb = std::bind(cb, std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
shared_from_this());
m_logCb = lcb;
m_handle = zookeeper_init2(hosts.c_str(), &ZKClient::OnWatcher, m_recvTimeout, nullptr, this, 0, lcb);
return m_handle != nullptr;
}

int32_t ZKClient::setServers(const std::string& hosts) {
auto rt = zoo_set_servers(m_handle, hosts.c_str());
if(rt == 0) {
m_hosts = hosts;
}
return rt;
}

int32_t ZKClient::create(const std::string& path, const std::string& val, std::string& new_path
,const struct ACL_vector* acl
,int flags) {
return zoo_create(m_handle, path.c_str(), val.c_str(), val.size(), acl, flags, &new_path[0], new_path.size());
}

int32_t ZKClient::exists(const std::string& path, bool watch, Stat* stat) {
return zoo_exists(m_handle, path.c_str(), watch, stat);
}

int32_t ZKClient::del(const std::string& path, int version) {
return zoo_delete(m_handle, path.c_str(), version);
}

int32_t ZKClient::get(const std::string& path, std::string& val, bool watch, Stat* stat) {
int len = val.size();
int32_t rt = zoo_get(m_handle, path.c_str(), watch, &val[0], &len, stat);
if(rt == ZOK) {
val.resize(len);
}
return rt;
}

int32_t ZKClient::getConfig(std::string& val, bool watch, Stat* stat) {
return get(ZOO_CONFIG_NODE, val, watch, stat);
}

int32_t ZKClient::set(const std::string& path, const std::string& val, int version, Stat* stat) {
return zoo_set2(m_handle, path.c_str(), val.c_str(), val.size(), version, stat);
}

int32_t ZKClient::getChildren(const std::string& path, std::vector<std::string>& val, bool watch, Stat* stat) {
String_vector strings;
Stat tmp;
if(stat == nullptr) {
stat = &tmp;
}
int32_t rt = zoo_get_children2(m_handle, path.c_str(), watch, &strings, stat);
if(rt == ZOK) {
for(int32_t i = 0; i < strings.count; ++i) {
val.push_back(strings.data[i]);
}
deallocate_String_vector(&strings);
}
return rt;
}

int32_t ZKClient::close() {
m_watcherCb = nullptr;
int32_t rt = ZOK;
if(m_handle) {
rt = zookeeper_close(m_handle);
m_handle = nullptr;
}
return rt;
}

std::string ZKClient::getCurrentServer() {
auto rt = zoo_get_current_server(m_handle);
return rt == nullptr ? "" : rt;
}

int32_t ZKClient::getState() {
return zoo_state(m_handle);
}

}
83 changes: 83 additions & 0 deletions sylar/zk_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#ifndef __SYLAR_ZK_CLIENT_H__
#define __SYLAR_ZK_CLIENT_H__

#include <memory>
#include <functional>
#include <string>
#include <stdint.h>
#include <vector>

#ifndef THREADED
#define THREADED
#endif

#include <zookeeper/zookeeper.h>

namespace sylar {

class ZKClient : public std::enable_shared_from_this<ZKClient> {
public:
class EventType {
public:
static const int CREATED; // = ZOO_CREATED_EVENT;
static const int DELETED; // = ZOO_DELETED_EVENT;
static const int CHANGED; // = ZOO_CHANGED_EVENT;
static const int CHILD ; // = ZOO_CHILD_EVENT;
static const int SESSION; // = ZOO_SESSION_EVENT;
static const int NOWATCHING; // = ZOO_NOTWATCHING_EVENT;
};
class FlagsType {
public:
static const int EPHEMERAL; // = ZOO_EPHEMERAL;
static const int SEQUENCE; // = ZOO_SEQUENCE;
static const int CONTAINER; // = ZOO_CONTAINER;
};
class StateType {
public:
static const int EXPIRED_SESSION; // = ZOO_EXPIRED_SESSION_STATE;
static const int AUTH_FAILED; // = ZOO_AUTH_FAILED_STATE;
static const int CONNECTING; // = ZOO_CONNECTING_STATE;
static const int ASSOCIATING; // = ZOO_ASSOCIATING_STATE;
static const int CONNECTED; // = ZOO_CONNECTED_STATE;
static const int READONLY; // = ZOO_READONLY_STATE;
static const int NOTCONNECTED; // = ZOO_NOTCONNECTED_STATE;
};

typedef std::shared_ptr<ZKClient> ptr;
typedef std::function<void(int type, int stat, const std::string& path, ZKClient::ptr)> watcher_callback;
typedef void(*log_callback)(const char *message);

ZKClient();
~ZKClient();

bool init(const std::string& hosts, int recv_timeout, watcher_callback cb, log_callback lcb = nullptr);
int32_t setServers(const std::string& hosts);

int32_t create(const std::string& path, const std::string& val, std::string& new_path
, const struct ACL_vector* acl = &ZOO_OPEN_ACL_UNSAFE
, int flags = 0);
int32_t exists(const std::string& path, bool watch, Stat* stat = nullptr);
int32_t del(const std::string& path, int version = -1);
int32_t get(const std::string& path, std::string& val, bool watch, Stat* stat = nullptr);
int32_t getConfig(std::string& val, bool watch, Stat* stat = nullptr);
int32_t set(const std::string& path, const std::string& val, int version = -1, Stat* stat = nullptr);
int32_t getChildren(const std::string& path, std::vector<std::string>& val, bool watch, Stat* stat = nullptr);
int32_t close();
int32_t getState();
std::string getCurrentServer();

bool reconnect();
private:
static void OnWatcher(zhandle_t *zh, int type, int stat, const char *path,void *watcherCtx);
typedef std::function<void(int type, int stat, const std::string& path)> watcher_callback2;
private:
zhandle_t* m_handle;
std::string m_hosts;
watcher_callback2 m_watcherCb;
log_callback m_logCb;
int32_t m_recvTimeout;
};

}

#endif
94 changes: 94 additions & 0 deletions tests/test_zookeeper.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#include "sylar/zk_client.h"
#include "sylar/log.h"
#include "sylar/iomanager.h"

static sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();

int g_argc;

void on_watcher(int type, int stat, const std::string& path, sylar::ZKClient::ptr client) {
SYLAR_LOG_INFO(g_logger) << " type=" << type
<< " stat=" << stat
<< " path=" << path
<< " client=" << client
<< " fiber=" << sylar::Fiber::GetThis()
<< " iomanager=" << sylar::IOManager::GetThis();

if(stat == ZOO_CONNECTED_STATE) {
if(g_argc == 1) {
std::vector<std::string> vals;
Stat stat;
int rt = client->getChildren("/", vals, true, &stat);
if(rt == ZOK) {
SYLAR_LOG_INFO(g_logger) << "[" << sylar::Join(vals.begin(), vals.end(), ",") << "]";
} else {
SYLAR_LOG_INFO(g_logger) << "getChildren error " << rt;
}
} else {
std::string new_val;
new_val.resize(255);
int rt = client->create("/zkxxx", "", new_val, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL);
if(rt == ZOK) {
SYLAR_LOG_INFO(g_logger) << "[" << new_val.c_str() << "]";
} else {
SYLAR_LOG_INFO(g_logger) << "getChildren error " << rt;
}

//extern ZOOAPI const int ZOO_SEQUENCE;
//extern ZOOAPI const int ZOO_CONTAINER;
rt = client->create("/zkxxx", "", new_val, &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE | ZOO_EPHEMERAL);
if(rt == ZOK) {
SYLAR_LOG_INFO(g_logger) << "create [" << new_val.c_str() << "]";
} else {
SYLAR_LOG_INFO(g_logger) << "create error " << rt;
}

rt = client->get("/hello", new_val, true);
if(rt == ZOK) {
SYLAR_LOG_INFO(g_logger) << "get [" << new_val.c_str() << "]";
} else {
SYLAR_LOG_INFO(g_logger) << "get error " << rt;
}

rt = client->create("/hello", "", new_val, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL);
if(rt == ZOK) {
SYLAR_LOG_INFO(g_logger) << "get [" << new_val.c_str() << "]";
} else {
SYLAR_LOG_INFO(g_logger) << "get error " << rt;
}

rt = client->set("/hello", "xxx");
if(rt == ZOK) {
SYLAR_LOG_INFO(g_logger) << "set [" << new_val.c_str() << "]";
} else {
SYLAR_LOG_INFO(g_logger) << "set error " << rt;
}

rt = client->del("/hello");
if(rt == ZOK) {
SYLAR_LOG_INFO(g_logger) << "del [" << new_val.c_str() << "]";
} else {
SYLAR_LOG_INFO(g_logger) << "del error " << rt;
}

}
} else if(stat == ZOO_EXPIRED_SESSION_STATE) {
client->reconnect();
}
}

int main(int argc, char** argv) {
g_argc = argc;
sylar::IOManager iom(1);
sylar::ZKClient::ptr client(new sylar::ZKClient);
if(g_argc > 1) {
SYLAR_LOG_INFO(g_logger) << client->init("127.0.0.1:21811", 3000, on_watcher);
//SYLAR_LOG_INFO(g_logger) << client->init("127.0.0.1:21811,127.0.0.1:21812,127.0.0.1:21811", 3000, on_watcher);
iom.addTimer(1115000, [client](){client->close();});
} else {
SYLAR_LOG_INFO(g_logger) << client->init("127.0.0.1:21811,127.0.0.1:21812,127.0.0.1:21811", 3000, on_watcher);
iom.addTimer(5000, [](){}, true);
}
iom.stop();
return 0;
}

0 comments on commit 7d4020d

Please sign in to comment.