diff --git a/CMakeLists.txt b/CMakeLists.txt index 6126809..5599ec5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -103,6 +103,7 @@ set(LIB_SRC sylar/util/hash_util.cc sylar/worker.cc sylar/application.cc + sylar/zk_client.cc ) ragelmaker(sylar/http/http11_parser.rl LIB_SRC ${CMAKE_CURRENT_SOURCE_DIR}/sylar/http) @@ -138,6 +139,7 @@ set(LIBS ${OPENSSL_LIBRARIES} ${PROTOBUF_LIBRARIES} mysqlclient_r + zookeeper_mt sqlite3 tinyxml2 ) @@ -184,6 +186,7 @@ sylar_add_executable(test_email "tests/test_email.cc" sylar "${LIBS}") sylar_add_executable(test_mysql "tests/test_mysql.cc" sylar "${LIBS}") sylar_add_executable(test_nameserver "tests/test_nameserver.cc" sylar "${LIBS}") sylar_add_executable(test_bitmap "tests/test_bitmap.cc" sylar "${LIBS}") +sylar_add_executable(test_zkclient "tests/test_zookeeper.cc" sylar "${LIBS}") set(ORM_SRCS sylar/orm/table.cc diff --git a/sylar/zk_client.cc b/sylar/zk_client.cc new file mode 100644 index 0000000..b8adca2 --- /dev/null +++ b/sylar/zk_client.cc @@ -0,0 +1,145 @@ +#include "zk_client.h" + +namespace sylar { + +//static const int CREATED = ZOO_CREATED_EVENT; +//static const int DELETED = ZOO_DELETED_EVENT; +//static const int CHANGED = ZOO_CHANGED_EVENT; +//static const int CHILD = ZOO_CHILD_EVENT; +//static const int SESSION = ZOO_SESSION_EVENT; +//static const int NOWATCHING = ZOO_NOTWATCHING_EVENT; + +const int ZKClient::EventType::CREATED = ZOO_CREATED_EVENT; +const int ZKClient::EventType::DELETED = ZOO_DELETED_EVENT; +const int ZKClient::EventType::CHANGED = ZOO_CHANGED_EVENT; +const int ZKClient::EventType::CHILD = ZOO_CHILD_EVENT; +const int ZKClient::EventType::SESSION = ZOO_SESSION_EVENT; +const int ZKClient::EventType::NOWATCHING = ZOO_NOTWATCHING_EVENT; + +const int ZKClient::FlagsType::EPHEMERAL = ZOO_EPHEMERAL; +const int ZKClient::FlagsType::SEQUENCE = ZOO_SEQUENCE; +const int ZKClient::FlagsType::CONTAINER = ZOO_CONTAINER; + +const int ZKClient::StateType::EXPIRED_SESSION = ZOO_EXPIRED_SESSION_STATE; +const int ZKClient::StateType::AUTH_FAILED = ZOO_AUTH_FAILED_STATE; +const int ZKClient::StateType::CONNECTING = ZOO_CONNECTING_STATE; +const int ZKClient::StateType::ASSOCIATING = ZOO_ASSOCIATING_STATE; +const int ZKClient::StateType::CONNECTED = ZOO_CONNECTED_STATE; +const int ZKClient::StateType::READONLY = ZOO_READONLY_STATE; +const int ZKClient::StateType::NOTCONNECTED = ZOO_NOTCONNECTED_STATE; + + +ZKClient::ZKClient() + :m_handle(nullptr) + ,m_recvTimeout(0) { +} + +ZKClient::~ZKClient() { + if(m_handle) { + close(); + } +} + +void ZKClient::OnWatcher(zhandle_t *zh, int type, int stat, const char *path,void *watcherCtx) { + ZKClient* client = (ZKClient*)watcherCtx; + client->m_watcherCb(type, stat, path); +} + +bool ZKClient::reconnect() { + if(m_handle) { + zookeeper_close(m_handle); + } + m_handle = zookeeper_init2(m_hosts.c_str(), &ZKClient::OnWatcher, m_recvTimeout, nullptr, this, 0, m_logCb); + return m_handle != nullptr; +} + +bool ZKClient::init(const std::string& hosts, int recv_timeout, watcher_callback cb, log_callback lcb) { + if(m_handle) { + return true; + } + m_hosts = hosts; + m_recvTimeout = recv_timeout; + m_watcherCb = std::bind(cb, std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3, + shared_from_this()); + m_logCb = lcb; + m_handle = zookeeper_init2(hosts.c_str(), &ZKClient::OnWatcher, m_recvTimeout, nullptr, this, 0, lcb); + return m_handle != nullptr; +} + +int32_t ZKClient::setServers(const std::string& hosts) { + auto rt = zoo_set_servers(m_handle, hosts.c_str()); + if(rt == 0) { + m_hosts = hosts; + } + return rt; +} + +int32_t ZKClient::create(const std::string& path, const std::string& val, std::string& new_path + ,const struct ACL_vector* acl + ,int flags) { + return zoo_create(m_handle, path.c_str(), val.c_str(), val.size(), acl, flags, &new_path[0], new_path.size()); +} + +int32_t ZKClient::exists(const std::string& path, bool watch, Stat* stat) { + return zoo_exists(m_handle, path.c_str(), watch, stat); +} + +int32_t ZKClient::del(const std::string& path, int version) { + return zoo_delete(m_handle, path.c_str(), version); +} + +int32_t ZKClient::get(const std::string& path, std::string& val, bool watch, Stat* stat) { + int len = val.size(); + int32_t rt = zoo_get(m_handle, path.c_str(), watch, &val[0], &len, stat); + if(rt == ZOK) { + val.resize(len); + } + return rt; +} + +int32_t ZKClient::getConfig(std::string& val, bool watch, Stat* stat) { + return get(ZOO_CONFIG_NODE, val, watch, stat); +} + +int32_t ZKClient::set(const std::string& path, const std::string& val, int version, Stat* stat) { + return zoo_set2(m_handle, path.c_str(), val.c_str(), val.size(), version, stat); +} + +int32_t ZKClient::getChildren(const std::string& path, std::vector& val, bool watch, Stat* stat) { + String_vector strings; + Stat tmp; + if(stat == nullptr) { + stat = &tmp; + } + int32_t rt = zoo_get_children2(m_handle, path.c_str(), watch, &strings, stat); + if(rt == ZOK) { + for(int32_t i = 0; i < strings.count; ++i) { + val.push_back(strings.data[i]); + } + deallocate_String_vector(&strings); + } + return rt; +} + +int32_t ZKClient::close() { + m_watcherCb = nullptr; + int32_t rt = ZOK; + if(m_handle) { + rt = zookeeper_close(m_handle); + m_handle = nullptr; + } + return rt; +} + +std::string ZKClient::getCurrentServer() { + auto rt = zoo_get_current_server(m_handle); + return rt == nullptr ? "" : rt; +} + +int32_t ZKClient::getState() { + return zoo_state(m_handle); +} + +} diff --git a/sylar/zk_client.h b/sylar/zk_client.h new file mode 100644 index 0000000..9da52ed --- /dev/null +++ b/sylar/zk_client.h @@ -0,0 +1,83 @@ +#ifndef __SYLAR_ZK_CLIENT_H__ +#define __SYLAR_ZK_CLIENT_H__ + +#include +#include +#include +#include +#include + +#ifndef THREADED +#define THREADED +#endif + +#include + +namespace sylar { + +class ZKClient : public std::enable_shared_from_this { +public: + class EventType { + public: + static const int CREATED; // = ZOO_CREATED_EVENT; + static const int DELETED; // = ZOO_DELETED_EVENT; + static const int CHANGED; // = ZOO_CHANGED_EVENT; + static const int CHILD ; // = ZOO_CHILD_EVENT; + static const int SESSION; // = ZOO_SESSION_EVENT; + static const int NOWATCHING; // = ZOO_NOTWATCHING_EVENT; + }; + class FlagsType { + public: + static const int EPHEMERAL; // = ZOO_EPHEMERAL; + static const int SEQUENCE; // = ZOO_SEQUENCE; + static const int CONTAINER; // = ZOO_CONTAINER; + }; + class StateType { + public: + static const int EXPIRED_SESSION; // = ZOO_EXPIRED_SESSION_STATE; + static const int AUTH_FAILED; // = ZOO_AUTH_FAILED_STATE; + static const int CONNECTING; // = ZOO_CONNECTING_STATE; + static const int ASSOCIATING; // = ZOO_ASSOCIATING_STATE; + static const int CONNECTED; // = ZOO_CONNECTED_STATE; + static const int READONLY; // = ZOO_READONLY_STATE; + static const int NOTCONNECTED; // = ZOO_NOTCONNECTED_STATE; + }; + + typedef std::shared_ptr ptr; + typedef std::function watcher_callback; + typedef void(*log_callback)(const char *message); + + ZKClient(); + ~ZKClient(); + + bool init(const std::string& hosts, int recv_timeout, watcher_callback cb, log_callback lcb = nullptr); + int32_t setServers(const std::string& hosts); + + int32_t create(const std::string& path, const std::string& val, std::string& new_path + , const struct ACL_vector* acl = &ZOO_OPEN_ACL_UNSAFE + , int flags = 0); + int32_t exists(const std::string& path, bool watch, Stat* stat = nullptr); + int32_t del(const std::string& path, int version = -1); + int32_t get(const std::string& path, std::string& val, bool watch, Stat* stat = nullptr); + int32_t getConfig(std::string& val, bool watch, Stat* stat = nullptr); + int32_t set(const std::string& path, const std::string& val, int version = -1, Stat* stat = nullptr); + int32_t getChildren(const std::string& path, std::vector& val, bool watch, Stat* stat = nullptr); + int32_t close(); + int32_t getState(); + std::string getCurrentServer(); + + bool reconnect(); +private: + static void OnWatcher(zhandle_t *zh, int type, int stat, const char *path,void *watcherCtx); + typedef std::function watcher_callback2; +private: + zhandle_t* m_handle; + std::string m_hosts; + watcher_callback2 m_watcherCb; + log_callback m_logCb; + int32_t m_recvTimeout; +}; + +} + +#endif diff --git a/tests/test_zookeeper.cc b/tests/test_zookeeper.cc new file mode 100644 index 0000000..d1d8e17 --- /dev/null +++ b/tests/test_zookeeper.cc @@ -0,0 +1,94 @@ +#include "sylar/zk_client.h" +#include "sylar/log.h" +#include "sylar/iomanager.h" + +static sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT(); + +int g_argc; + +void on_watcher(int type, int stat, const std::string& path, sylar::ZKClient::ptr client) { + SYLAR_LOG_INFO(g_logger) << " type=" << type + << " stat=" << stat + << " path=" << path + << " client=" << client + << " fiber=" << sylar::Fiber::GetThis() + << " iomanager=" << sylar::IOManager::GetThis(); + + if(stat == ZOO_CONNECTED_STATE) { + if(g_argc == 1) { + std::vector vals; + Stat stat; + int rt = client->getChildren("/", vals, true, &stat); + if(rt == ZOK) { + SYLAR_LOG_INFO(g_logger) << "[" << sylar::Join(vals.begin(), vals.end(), ",") << "]"; + } else { + SYLAR_LOG_INFO(g_logger) << "getChildren error " << rt; + } + } else { + std::string new_val; + new_val.resize(255); + int rt = client->create("/zkxxx", "", new_val, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL); + if(rt == ZOK) { + SYLAR_LOG_INFO(g_logger) << "[" << new_val.c_str() << "]"; + } else { + SYLAR_LOG_INFO(g_logger) << "getChildren error " << rt; + } + +//extern ZOOAPI const int ZOO_SEQUENCE; +//extern ZOOAPI const int ZOO_CONTAINER; + rt = client->create("/zkxxx", "", new_val, &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE | ZOO_EPHEMERAL); + if(rt == ZOK) { + SYLAR_LOG_INFO(g_logger) << "create [" << new_val.c_str() << "]"; + } else { + SYLAR_LOG_INFO(g_logger) << "create error " << rt; + } + + rt = client->get("/hello", new_val, true); + if(rt == ZOK) { + SYLAR_LOG_INFO(g_logger) << "get [" << new_val.c_str() << "]"; + } else { + SYLAR_LOG_INFO(g_logger) << "get error " << rt; + } + + rt = client->create("/hello", "", new_val, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL); + if(rt == ZOK) { + SYLAR_LOG_INFO(g_logger) << "get [" << new_val.c_str() << "]"; + } else { + SYLAR_LOG_INFO(g_logger) << "get error " << rt; + } + + rt = client->set("/hello", "xxx"); + if(rt == ZOK) { + SYLAR_LOG_INFO(g_logger) << "set [" << new_val.c_str() << "]"; + } else { + SYLAR_LOG_INFO(g_logger) << "set error " << rt; + } + + rt = client->del("/hello"); + if(rt == ZOK) { + SYLAR_LOG_INFO(g_logger) << "del [" << new_val.c_str() << "]"; + } else { + SYLAR_LOG_INFO(g_logger) << "del error " << rt; + } + + } + } else if(stat == ZOO_EXPIRED_SESSION_STATE) { + client->reconnect(); + } +} + +int main(int argc, char** argv) { + g_argc = argc; + sylar::IOManager iom(1); + sylar::ZKClient::ptr client(new sylar::ZKClient); + if(g_argc > 1) { + SYLAR_LOG_INFO(g_logger) << client->init("127.0.0.1:21811", 3000, on_watcher); + //SYLAR_LOG_INFO(g_logger) << client->init("127.0.0.1:21811,127.0.0.1:21812,127.0.0.1:21811", 3000, on_watcher); + iom.addTimer(1115000, [client](){client->close();}); + } else { + SYLAR_LOG_INFO(g_logger) << client->init("127.0.0.1:21811,127.0.0.1:21812,127.0.0.1:21811", 3000, on_watcher); + iom.addTimer(5000, [](){}, true); + } + iom.stop(); + return 0; +}