From 5b9aacb4c3086af9a3ec5bec7ed3ab71bfc78729 Mon Sep 17 00:00:00 2001 From: Alain Jobart Date: Thu, 7 Jul 2016 08:52:20 -0700 Subject: [PATCH] Removing old zk library. --- .gitignore | 8 - vendor/launchpad.net/gozk/zookeeper/.lbox | 1 - vendor/launchpad.net/gozk/zookeeper/LICENSE | 165 --- vendor/launchpad.net/gozk/zookeeper/Makefile | 67 - vendor/launchpad.net/gozk/zookeeper/README | 1 - .../gozk/zookeeper/close_test.go | 220 ---- .../gozk/zookeeper/example/example.go | 28 - vendor/launchpad.net/gozk/zookeeper/helpers.c | 108 -- vendor/launchpad.net/gozk/zookeeper/helpers.h | 49 - .../gozk/zookeeper/retry_test.go | 266 ---- .../launchpad.net/gozk/zookeeper/runserver.go | 171 --- vendor/launchpad.net/gozk/zookeeper/server.go | 250 ---- .../gozk/zookeeper/server_test.go | 246 ---- .../gozk/zookeeper/suite_test.go | 129 -- vendor/launchpad.net/gozk/zookeeper/zk.go | 1126 ----------------- .../launchpad.net/gozk/zookeeper/zk_test.go | 710 ----------- 16 files changed, 3545 deletions(-) delete mode 100644 vendor/launchpad.net/gozk/zookeeper/.lbox delete mode 100644 vendor/launchpad.net/gozk/zookeeper/LICENSE delete mode 100644 vendor/launchpad.net/gozk/zookeeper/Makefile delete mode 100644 vendor/launchpad.net/gozk/zookeeper/README delete mode 100644 vendor/launchpad.net/gozk/zookeeper/close_test.go delete mode 100644 vendor/launchpad.net/gozk/zookeeper/example/example.go delete mode 100644 vendor/launchpad.net/gozk/zookeeper/helpers.c delete mode 100644 vendor/launchpad.net/gozk/zookeeper/helpers.h delete mode 100644 vendor/launchpad.net/gozk/zookeeper/retry_test.go delete mode 100644 vendor/launchpad.net/gozk/zookeeper/runserver.go delete mode 100644 vendor/launchpad.net/gozk/zookeeper/server.go delete mode 100644 vendor/launchpad.net/gozk/zookeeper/server_test.go delete mode 100644 vendor/launchpad.net/gozk/zookeeper/suite_test.go delete mode 100644 vendor/launchpad.net/gozk/zookeeper/zk.go delete mode 100644 vendor/launchpad.net/gozk/zookeeper/zk_test.go diff --git a/.gitignore b/.gitignore index 8f9527d9d9c..d0cedd9cbae 100644 --- a/.gitignore +++ b/.gitignore @@ -22,15 +22,7 @@ tags /Godeps/_workspace/bin # Go vendored libs -# Ideally we would match two levels deep so we can un-ignore specific repos -# like launchpad.net/gozk, but doing that triggers a bug in "git stash". -# As a workaround, we ignore at the domain level. /vendor/*/ -# We really only want to un-ignore launchpad.net/gozk, -# because it has local, tracked changes. -# But we can't do that because of the bug mentioned above, -# so we un-ignore all of launchpad.net. -!/vendor/launchpad.net/ # C build dirs **/build diff --git a/vendor/launchpad.net/gozk/zookeeper/.lbox b/vendor/launchpad.net/gozk/zookeeper/.lbox deleted file mode 100644 index 1563e822983..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/.lbox +++ /dev/null @@ -1 +0,0 @@ -propose -cr -for lp:gozk/zookeeper diff --git a/vendor/launchpad.net/gozk/zookeeper/LICENSE b/vendor/launchpad.net/gozk/zookeeper/LICENSE deleted file mode 100644 index 65c5ca88a67..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/LICENSE +++ /dev/null @@ -1,165 +0,0 @@ - GNU LESSER GENERAL PUBLIC LICENSE - Version 3, 29 June 2007 - - Copyright (C) 2007 Free Software Foundation, Inc. - Everyone is permitted to copy and distribute verbatim copies - of this license document, but changing it is not allowed. - - - This version of the GNU Lesser General Public License incorporates -the terms and conditions of version 3 of the GNU General Public -License, supplemented by the additional permissions listed below. - - 0. Additional Definitions. - - As used herein, "this License" refers to version 3 of the GNU Lesser -General Public License, and the "GNU GPL" refers to version 3 of the GNU -General Public License. - - "The Library" refers to a covered work governed by this License, -other than an Application or a Combined Work as defined below. - - An "Application" is any work that makes use of an interface provided -by the Library, but which is not otherwise based on the Library. -Defining a subclass of a class defined by the Library is deemed a mode -of using an interface provided by the Library. - - A "Combined Work" is a work produced by combining or linking an -Application with the Library. The particular version of the Library -with which the Combined Work was made is also called the "Linked -Version". - - The "Minimal Corresponding Source" for a Combined Work means the -Corresponding Source for the Combined Work, excluding any source code -for portions of the Combined Work that, considered in isolation, are -based on the Application, and not on the Linked Version. - - The "Corresponding Application Code" for a Combined Work means the -object code and/or source code for the Application, including any data -and utility programs needed for reproducing the Combined Work from the -Application, but excluding the System Libraries of the Combined Work. - - 1. Exception to Section 3 of the GNU GPL. - - You may convey a covered work under sections 3 and 4 of this License -without being bound by section 3 of the GNU GPL. - - 2. Conveying Modified Versions. - - If you modify a copy of the Library, and, in your modifications, a -facility refers to a function or data to be supplied by an Application -that uses the facility (other than as an argument passed when the -facility is invoked), then you may convey a copy of the modified -version: - - a) under this License, provided that you make a good faith effort to - ensure that, in the event an Application does not supply the - function or data, the facility still operates, and performs - whatever part of its purpose remains meaningful, or - - b) under the GNU GPL, with none of the additional permissions of - this License applicable to that copy. - - 3. Object Code Incorporating Material from Library Header Files. - - The object code form of an Application may incorporate material from -a header file that is part of the Library. You may convey such object -code under terms of your choice, provided that, if the incorporated -material is not limited to numerical parameters, data structure -layouts and accessors, or small macros, inline functions and templates -(ten or fewer lines in length), you do both of the following: - - a) Give prominent notice with each copy of the object code that the - Library is used in it and that the Library and its use are - covered by this License. - - b) Accompany the object code with a copy of the GNU GPL and this license - document. - - 4. Combined Works. - - You may convey a Combined Work under terms of your choice that, -taken together, effectively do not restrict modification of the -portions of the Library contained in the Combined Work and reverse -engineering for debugging such modifications, if you also do each of -the following: - - a) Give prominent notice with each copy of the Combined Work that - the Library is used in it and that the Library and its use are - covered by this License. - - b) Accompany the Combined Work with a copy of the GNU GPL and this license - document. - - c) For a Combined Work that displays copyright notices during - execution, include the copyright notice for the Library among - these notices, as well as a reference directing the user to the - copies of the GNU GPL and this license document. - - d) Do one of the following: - - 0) Convey the Minimal Corresponding Source under the terms of this - License, and the Corresponding Application Code in a form - suitable for, and under terms that permit, the user to - recombine or relink the Application with a modified version of - the Linked Version to produce a modified Combined Work, in the - manner specified by section 6 of the GNU GPL for conveying - Corresponding Source. - - 1) Use a suitable shared library mechanism for linking with the - Library. A suitable mechanism is one that (a) uses at run time - a copy of the Library already present on the user's computer - system, and (b) will operate properly with a modified version - of the Library that is interface-compatible with the Linked - Version. - - e) Provide Installation Information, but only if you would otherwise - be required to provide such information under section 6 of the - GNU GPL, and only to the extent that such information is - necessary to install and execute a modified version of the - Combined Work produced by recombining or relinking the - Application with a modified version of the Linked Version. (If - you use option 4d0, the Installation Information must accompany - the Minimal Corresponding Source and Corresponding Application - Code. If you use option 4d1, you must provide the Installation - Information in the manner specified by section 6 of the GNU GPL - for conveying Corresponding Source.) - - 5. Combined Libraries. - - You may place library facilities that are a work based on the -Library side by side in a single library together with other library -facilities that are not Applications and are not covered by this -License, and convey such a combined library under terms of your -choice, if you do both of the following: - - a) Accompany the combined library with a copy of the same work based - on the Library, uncombined with any other library facilities, - conveyed under the terms of this License. - - b) Give prominent notice with the combined library that part of it - is a work based on the Library, and explaining where to find the - accompanying uncombined form of the same work. - - 6. Revised Versions of the GNU Lesser General Public License. - - The Free Software Foundation may publish revised and/or new versions -of the GNU Lesser General Public License from time to time. Such new -versions will be similar in spirit to the present version, but may -differ in detail to address new problems or concerns. - - Each version is given a distinguishing version number. If the -Library as you received it specifies that a certain numbered version -of the GNU Lesser General Public License "or any later version" -applies to it, you have the option of following the terms and -conditions either of that published version or of any later version -published by the Free Software Foundation. If the Library as you -received it does not specify a version number of the GNU Lesser -General Public License, you may choose any version of the GNU Lesser -General Public License ever published by the Free Software Foundation. - - If the Library as you received it specifies that a proxy can decide -whether future versions of the GNU Lesser General Public License shall -apply, that proxy's public statement of acceptance of any version is -permanent authorization for you to choose that version for the -Library. diff --git a/vendor/launchpad.net/gozk/zookeeper/Makefile b/vendor/launchpad.net/gozk/zookeeper/Makefile deleted file mode 100644 index ce36e32a5fa..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/Makefile +++ /dev/null @@ -1,67 +0,0 @@ -include $(GOROOT)/src/Make.inc - -all: package - -TARG=launchpad.net/gozk/zookeeper - -GOFILES=\ - server.go\ - runserver.go\ - -CGOFILES=\ - zk.go\ - -CGO_OFILES=\ - helpers.o\ - -ifdef ZKROOT -LIBDIR=$(ZKROOT)/src/c/.libs -LD_LIBRARY_PATH:=$(LIBDIR):$(LD_LIBRARY_PATH) -CGO_CFLAGS+=-I$(ZKROOT)/src/c/include -I$(ZKROOT)/src/c/generated -CGO_LDFLAGS+=-L$(LIBDIR) -else -LIBDIR=/usr/lib -endif - -# For static compilation, will have to take LDFLAGS out of gozk.go too. -ifdef STATIC -CGO_LDFLAGS+=-lm -lpthread -# XXX This has ordering issues with current Make.pkg: -#CGO_OFILES+=$(wildcard _lib/*.o) -# -#_lib: -# @mkdir -p _lib -# cd _lib && ar x $(LIBDIR)/libzookeeper_mt.a -# -#_cgo_defun.c: _lib -CGO_OFILES+=\ - _lib/hashtable_itr.o\ - _lib/libzkmt_la-zk_hashtable.o\ - _lib/hashtable.o\ - _lib/libzkmt_la-zk_log.o\ - _lib/libzkmt_la-mt_adaptor.o\ - _lib/libzkmt_la-zookeeper.jute.o\ - _lib/libzkmt_la-recordio.o\ - _lib/libzkmt_la-zookeeper.o\ - -_lib/%.o: - @mkdir -p _lib - cd _lib && ar x $(LIBDIR)/libzookeeper_mt.a - -endif - -CLEANFILES+=_lib - -GOFMT=gofmt -BADFMT:=$(shell $(GOFMT) -l $(GOFILES) $(CGOFILES) $(wildcard *_test.go)) - -gofmt: $(BADFMT) - @for F in $(BADFMT); do $(GOFMT) -w $$F && echo $$F; done - -ifneq ($(BADFMT),) -ifneq ($(MAKECMDGOALS),gofmt) -$(warning WARNING: make gofmt: $(BADFMT)) -endif -endif - -include $(GOROOT)/src/Make.pkg diff --git a/vendor/launchpad.net/gozk/zookeeper/README b/vendor/launchpad.net/gozk/zookeeper/README deleted file mode 100644 index 77d65941612..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/README +++ /dev/null @@ -1 +0,0 @@ -Check out https://wiki.ubuntu.com/gozk diff --git a/vendor/launchpad.net/gozk/zookeeper/close_test.go b/vendor/launchpad.net/gozk/zookeeper/close_test.go deleted file mode 100644 index 615ef79fc59..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/close_test.go +++ /dev/null @@ -1,220 +0,0 @@ -package zookeeper_test - -import ( - "io" - . "launchpad.net/gocheck" - zk "launchpad.net/gozk/zookeeper" - "log" - "net" - "time" -) - -// requestFuncs holds all the requests that take a read lock -// on the zk connection except those that don't actually -// make a round trip to the server. -var requestFuncs = []func(conn *zk.Conn, path string) error{ - func(conn *zk.Conn, path string) error { - _, err := conn.Create(path, "", 0, nil) - return err - }, - func(conn *zk.Conn, path string) error { - _, err := conn.Exists(path) - return err - }, - func(conn *zk.Conn, path string) error { - _, _, err := conn.ExistsW(path) - return err - }, - func(conn *zk.Conn, path string) error { - _, _, err := conn.Get(path) - return err - }, - func(conn *zk.Conn, path string) error { - _, _, _, err := conn.GetW(path) - return err - }, - func(conn *zk.Conn, path string) error { - _, _, err := conn.Children(path) - return err - }, - func(conn *zk.Conn, path string) error { - _, _, _, err := conn.ChildrenW(path) - return err - }, - func(conn *zk.Conn, path string) error { - _, err := conn.Set(path, "", 0) - return err - }, - func(conn *zk.Conn, path string) error { - _, _, err := conn.ACL(path) - return err - }, - func(conn *zk.Conn, path string) error { - return conn.SetACL(path, []zk.ACL{{ - Perms: zk.PERM_ALL, - Scheme: "digest", - Id: "foo", - }}, 0) - }, - func(conn *zk.Conn, path string) error { - return conn.Delete(path, 0) - }, -} - -func (s *S) TestConcurrentClose(c *C) { - // make sure the server is ready to receive connections. - s.init(c) - - // Close should wait until all outstanding requests have - // completed before returning. The idea of this test is that - // any request that requests or changes a zookeeper node must - // make at least one round trip to the server, so we interpose a - // proxy between the client and the server which can stop all - // incoming traffic on demand, thus blocking the request until - // we want it to unblock. - // - // We assume that all requests take less than 0.1s to complete, - // thus when we wait below, neither of the above goroutines - // should complete within the allotted time (the request because - // it's waiting for a reply from the server and the close - // because it's waiting for the request to complete). If the - // locking doesn't work, the Close will return early. If the - // proxy blocking doesn't work, the request will return early. - // - // When we reenable incoming messages from the server, both - // goroutines should complete. We can't tell which completes - // first, but the fact that the close blocked is sufficient to - // tell that the locking is working correctly. - for i, f := range requestFuncs { - c.Logf("iter %d", i) - p := newProxy(c, s.zkAddr) - conn, watch, err := zk.Dial(p.addr(), 5e9) - c.Assert(err, IsNil) - c.Assert((<-watch).Ok(), Equals, true) - - // sanity check that the connection is actually - // up and running. - _, err = conn.Exists("/nothing") - c.Assert(err, IsNil) - - p.stopIncoming() - reqDone := make(chan bool) - closeDone := make(chan bool) - go func() { - f(conn, "/closetest") - reqDone <- true - }() - go func() { - // sleep for long enough for the request to be initiated and the read lock taken. - time.Sleep(0.05e9) - conn.Close() - closeDone <- true - }() - select { - case <-reqDone: - c.Fatalf("request %d finished early", i) - case <-closeDone: - c.Fatalf("request %d close finished early", i) - case <-time.After(0.1e9): - } - p.startIncoming() - for reqDone != nil || closeDone != nil { - select { - case <-reqDone: - reqDone = nil - case <-closeDone: - closeDone = nil - case <-time.After(0.4e9): - c.Fatalf("request %d timed out waiting for req (%p) and close(%p)", i, reqDone, closeDone) - } - } - p.close() - err = f(conn, "/closetest") - c.Check(zk.IsError(err, zk.ZCLOSING), Equals, true, Commentf("%v", err)) - } -} - -type proxy struct { - stop, start chan bool - listener net.Listener -} - -// newProxy will listen on proxyAddr and connect its client to dstAddr, and return -// a proxy instance that can be used to control the connection. -func newProxy(c *C, dstAddr string) *proxy { - listener, err := net.Listen("tcp", "127.0.0.1:0") - c.Assert(err, IsNil) - p := &proxy{ - stop: make(chan bool, 1), - start: make(chan bool, 1), - listener: listener, - } - - go func() { - for { - client, err := p.listener.Accept() - if err != nil { - // Ignore the error, because the connection will fail anyway. - return - } - go func() { - defer client.Close() - server, err := net.Dial("tcp", dstAddr) - if err != nil { - log.Printf("cannot dial %q: %v", dstAddr, err) - return - } - defer server.Close() - go io.Copy(&haltableWriter{ - w: client, - stop: p.stop, - start: p.start}, - server) - // When the client is closed, the deferred closes will - // take down the other io.Copy too. - io.Copy(server, client) - }() - } - }() - return p -} - -func (p *proxy) close() error { - return p.listener.Close() -} - -func (p *proxy) addr() string { - return p.listener.Addr().String() -} - -func (p *proxy) stopIncoming() { - if p.stop == nil { - panic("cannot stop twice") - } - p.stop <- true - p.stop = nil -} - -func (p *proxy) startIncoming() { - if p.start == nil { - panic("cannot start twice") - } - p.start <- true - p.start = nil -} - -type haltableWriter struct { - w io.Writer - stop, start chan bool -} - -func (w *haltableWriter) Write(buf []byte) (int, error) { - select { - case <-w.stop: - w.stop <- true - <-w.start - w.start <- true - default: - } - return w.w.Write(buf) -} diff --git a/vendor/launchpad.net/gozk/zookeeper/example/example.go b/vendor/launchpad.net/gozk/zookeeper/example/example.go deleted file mode 100644 index 80738086656..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/example/example.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import ( - "fmt" - "launchpad.net/gozk/zookeeper" - "log" -) - -func main() { - zk, session, err := zookeeper.Dial("localhost:2181", 5e9) - if err != nil { - log.Fatalf("Can't connect: %v", err) - } - defer zk.Close() - - // Wait for connection. - event := <-session - if event.State != zookeeper.STATE_CONNECTED { - log.Fatalf("Can't connect: %v", event) - } - - _, err = zk.Create("/counter", "0", 0, zookeeper.WorldACL(zookeeper.PERM_ALL)) - if err != nil { - log.Fatalf("Can't create counter: %v", err) - } else { - fmt.Println("Counter created!") - } -} diff --git a/vendor/launchpad.net/gozk/zookeeper/helpers.c b/vendor/launchpad.net/gozk/zookeeper/helpers.c deleted file mode 100644 index 8d036cad750..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/helpers.c +++ /dev/null @@ -1,108 +0,0 @@ - -#include -#include -#include -#include "helpers.h" - - -static pthread_mutex_t watch_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t watch_available = PTHREAD_COND_INITIALIZER; - -static watch_data *first_watch = NULL; - -completion_data* create_completion_data() { - completion_data *data = malloc(sizeof(completion_data)); - pthread_mutex_init(&data->mutex, NULL); - pthread_mutex_lock(&data->mutex); - return data; -} - -void destroy_completion_data(completion_data *data) { - pthread_mutex_destroy(&data->mutex); - free(data); -} - -void wait_for_completion(completion_data *data) { - pthread_mutex_lock(&data->mutex); -} - -void _handle_void_completion(int rc, const void *data_) { - completion_data *data = (completion_data*)data_; - data->data = (void*)(size_t)rc; - pthread_mutex_unlock(&data->mutex); -} - -void _watch_handler(zhandle_t *zh, int event_type, int connection_state, - const char *event_path, void *watch_context) -{ - pthread_mutex_lock(&watch_mutex); - { - watch_data *data = malloc(sizeof(watch_data)); // XXX Check data. - data->connection_state = connection_state; - data->event_type = event_type; - data->event_path = strdup(event_path); // XXX Check event_path. - data->watch_context = watch_context; - data->next = NULL; - - if (first_watch == NULL) { - first_watch = data; - } else { - watch_data *last_watch = first_watch; - while (last_watch->next != NULL) { - last_watch = last_watch->next; - } - last_watch->next = data; - } - - pthread_cond_signal(&watch_available); - } - pthread_mutex_unlock(&watch_mutex); -} - -watch_data *wait_for_watch() { - watch_data *data = NULL; - - pthread_mutex_lock(&watch_mutex); - { - while (first_watch == NULL) { - pthread_cond_wait(&watch_available, &watch_mutex); - } - data = first_watch; - first_watch = first_watch->next; - data->next = NULL; // Just in case. - } - pthread_mutex_unlock(&watch_mutex); - - return data; -} - -void destroy_watch_data(watch_data *data) { - free(data->event_path); - free(data); -} - - -// Cgo doesn't like to use function addresses as variables. -watcher_fn watch_handler = _watch_handler; -void_completion_t handle_void_completion = _handle_void_completion; - -zhandle_t *zookeeper_init_int(const char *host, watcher_fn fn, - int recv_timeout, const clientid_t *clientid, unsigned long context, int flags) { - return zookeeper_init(host, fn, recv_timeout, clientid, (void*)context, flags); -} -int zoo_wget_int(zhandle_t *zh, const char *path, - watcher_fn watcher, unsigned long watcherCtx, - char *buffer, int* buffer_len, struct Stat *stat) { - return zoo_wget(zh, path, watcher, (void*)watcherCtx, buffer, buffer_len, stat); -} -int zoo_wget_children2_int(zhandle_t *zh, const char *path, - watcher_fn watcher, unsigned long watcherCtx, - struct String_vector *strings, struct Stat *stat) { - return zoo_wget_children2(zh, path, watcher, (void*)watcherCtx, strings, stat); -} -int zoo_wexists_int(zhandle_t *zh, const char *path, - watcher_fn watcher, unsigned long watcherCtx, struct Stat *stat) { - return zoo_wexists(zh, path, watcher, (void*)watcherCtx, stat); -} - -// vim:ts=4:sw=4:et diff --git a/vendor/launchpad.net/gozk/zookeeper/helpers.h b/vendor/launchpad.net/gozk/zookeeper/helpers.h deleted file mode 100644 index 72cfb8f3677..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/helpers.h +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef helpers_h -#define helpers_h 1 - -#include -#include - -typedef struct _watch_data { - int connection_state; - int event_type; - char *event_path; - void *watch_context; - struct _watch_data *next; -} watch_data; - -typedef struct _completion_data { - pthread_mutex_t mutex; - void *data; -} completion_data; - -completion_data* create_completion_data(); -void destroy_completion_data(completion_data *data); -void wait_for_completion(completion_data *data); - -watch_data *wait_for_watch(); -void destroy_watch_data(watch_data *data); - -// Cgo doesn't like to use function addresses as variables. -extern watcher_fn watch_handler; -extern void_completion_t handle_void_completion; - -// The precise GC in Go 1.4+ doesn't like it when we cast arbitrary -// integers to unsafe.Pointer to pass to the void* context parameter. -// Below are helper functions that perform the cast in C so the Go GC -// doesn't try to interpret it as a pointer. - -zhandle_t *zookeeper_init_int(const char *host, watcher_fn fn, - int recv_timeout, const clientid_t *clientid, unsigned long context, int flags); -int zoo_wget_int(zhandle_t *zh, const char *path, - watcher_fn watcher, unsigned long watcherCtx, - char *buffer, int* buffer_len, struct Stat *stat); -int zoo_wget_children2_int(zhandle_t *zh, const char *path, - watcher_fn watcher, unsigned long watcherCtx, - struct String_vector *strings, struct Stat *stat); -int zoo_wexists_int(zhandle_t *zh, const char *path, - watcher_fn watcher, unsigned long watcherCtx, struct Stat *stat); - -#endif - -// vim:ts=4:sw=4:et diff --git a/vendor/launchpad.net/gozk/zookeeper/retry_test.go b/vendor/launchpad.net/gozk/zookeeper/retry_test.go deleted file mode 100644 index 96317925a3f..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/retry_test.go +++ /dev/null @@ -1,266 +0,0 @@ -package zookeeper_test - -import ( - "errors" - . "launchpad.net/gocheck" - zk "launchpad.net/gozk/zookeeper" -) - -func (s *S) TestRetryChangeCreating(c *C) { - conn, _ := s.init(c) - - err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), - func(data string, stat *zk.Stat) (string, error) { - c.Assert(data, Equals, "") - c.Assert(stat, IsNil) - return "new", nil - }) - c.Assert(err, IsNil) - - data, stat, err := conn.Get("/test") - c.Assert(err, IsNil) - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 0) - c.Assert(data, Equals, "new") - - acl, _, err := conn.ACL("/test") - c.Assert(err, IsNil) - c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_ALL)) -} - -func (s *S) TestRetryChangeSetting(c *C) { - conn, _ := s.init(c) - - _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, - func(data string, stat *zk.Stat) (string, error) { - c.Assert(data, Equals, "old") - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 0) - return "brand new", nil - }) - c.Assert(err, IsNil) - - data, stat, err := conn.Get("/test") - c.Assert(err, IsNil) - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 1) - c.Assert(data, Equals, "brand new") - - // ACL was unchanged by RetryChange(). - acl, _, err := conn.ACL("/test") - c.Assert(err, IsNil) - c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_ALL)) -} - -func (s *S) TestRetryChangeUnchangedValueDoesNothing(c *C) { - conn, _ := s.init(c) - - _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, - func(data string, stat *zk.Stat) (string, error) { - c.Assert(data, Equals, "old") - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 0) - return "old", nil - }) - c.Assert(err, IsNil) - - data, stat, err := conn.Get("/test") - c.Assert(err, IsNil) - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 0) // Unchanged! - c.Assert(data, Equals, "old") -} - -func (s *S) TestRetryChangeConflictOnCreate(c *C) { - conn, _ := s.init(c) - - changeFunc := func(data string, stat *zk.Stat) (string, error) { - switch data { - case "": - c.Assert(stat, IsNil) - _, err := conn.Create("/test", "conflict", zk.EPHEMERAL, - zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - return " => conflict", nil - case "conflict": - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 0) - return "conflict => new", nil - default: - c.Fatal("Unexpected node data: " + data) - } - return "can't happen", nil - } - - err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), changeFunc) - c.Assert(err, IsNil) - - data, stat, err := conn.Get("/test") - c.Assert(err, IsNil) - c.Assert(data, Equals, "conflict => new") - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 1) -} - -func (s *S) TestRetryChangeConflictOnSetDueToChange(c *C) { - conn, _ := s.init(c) - - _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - changeFunc := func(data string, stat *zk.Stat) (string, error) { - switch data { - case "old": - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 0) - _, err := conn.Set("/test", "conflict", 0) - c.Assert(err, IsNil) - return "old => new", nil - case "conflict": - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 1) - return "conflict => new", nil - default: - c.Fatal("Unexpected node data: " + data) - } - return "can't happen", nil - } - - err = conn.RetryChange("/test", zk.EPHEMERAL, []zk.ACL{}, changeFunc) - c.Assert(err, IsNil) - - data, stat, err := conn.Get("/test") - c.Assert(err, IsNil) - c.Assert(data, Equals, "conflict => new") - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 2) -} - -func (s *S) TestRetryChangeConflictOnSetDueToDelete(c *C) { - conn, _ := s.init(c) - - _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - changeFunc := func(data string, stat *zk.Stat) (string, error) { - switch data { - case "old": - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 0) - err := conn.Delete("/test", 0) - c.Assert(err, IsNil) - return "old => ", nil - case "": - c.Assert(stat, IsNil) - return " => new", nil - default: - c.Fatal("Unexpected node data: " + data) - } - return "can't happen", nil - } - - err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ), changeFunc) - c.Assert(err, IsNil) - - data, stat, err := conn.Get("/test") - c.Assert(err, IsNil) - c.Assert(data, Equals, " => new") - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 0) - - // Should be the new ACL. - acl, _, err := conn.ACL("/test") - c.Assert(err, IsNil) - c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_READ)) -} - -func (s *S) TestRetryChangeErrorInCallback(c *C) { - conn, _ := s.init(c) - - err := conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), - func(data string, stat *zk.Stat) (string, error) { - return "don't use this", errors.New("BOOM!") - }) - c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, "BOOM!") - - stat, err := conn.Exists("/test") - c.Assert(err, IsNil) - c.Assert(stat, IsNil) -} - -func (s *S) TestRetryChangeFailsReading(c *C) { - conn, _ := s.init(c) - - // Write only! - _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_WRITE)) - c.Assert(err, IsNil) - - var called bool - err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), - func(data string, stat *zk.Stat) (string, error) { - called = true - return "", nil - }) - c.Assert(err, NotNil) - c.Check(zk.IsError(err, zk.ZNOAUTH), Equals, true, Commentf("%v", err)) - - stat, err := conn.Exists("/test") - c.Assert(err, IsNil) - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 0) - - c.Assert(called, Equals, false) -} - -func (s *S) TestRetryChangeFailsSetting(c *C) { - conn, _ := s.init(c) - - // Read only! - _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ)) - c.Assert(err, IsNil) - - var called bool - err = conn.RetryChange("/test", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), - func(data string, stat *zk.Stat) (string, error) { - called = true - return "", nil - }) - c.Check(zk.IsError(err, zk.ZNOAUTH), Equals, true, Commentf("%v", err)) - - stat, err := conn.Exists("/test") - c.Assert(err, IsNil) - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 0) - - c.Assert(called, Equals, true) -} - -func (s *S) TestRetryChangeFailsCreating(c *C) { - conn, _ := s.init(c) - - // Read only! - _, err := conn.Create("/test", "old", zk.EPHEMERAL, zk.WorldACL(zk.PERM_READ)) - c.Assert(err, IsNil) - - var called bool - err = conn.RetryChange("/test/sub", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL), - func(data string, stat *zk.Stat) (string, error) { - called = true - return "", nil - }) - c.Assert(err, NotNil) - c.Check(zk.IsError(err, zk.ZNOAUTH), Equals, true, Commentf("%v", err)) - - stat, err := conn.Exists("/test/sub") - c.Assert(err, IsNil) - c.Assert(stat, IsNil) - - c.Assert(called, Equals, true) -} diff --git a/vendor/launchpad.net/gozk/zookeeper/runserver.go b/vendor/launchpad.net/gozk/zookeeper/runserver.go deleted file mode 100644 index f86ceab6dac..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/runserver.go +++ /dev/null @@ -1,171 +0,0 @@ -package zookeeper - -// This file defines methods on Server that deal with starting -// and stopping the ZooKeeper service. They are independent of ZooKeeper -// itself, and may be factored out at a later date. - -import ( - "errors" - "fmt" - "io/ioutil" - "os" - "os/exec" - "strconv" - "syscall" - "time" -) - -// ErrNotRunning is the error returned when Process cannot -// find the currently running zookeeper process. -var ErrNotRunning = errors.New("process not running") - -// Process returns a Process referring to the running server from -// where it's been stored in pid.txt. If the file does not -// exist, or it cannot find the process, it returns the error -// ErrNotRunning. -func (srv *Server) Process() (*os.Process, error) { - data, err := ioutil.ReadFile(srv.path("pid.txt")) - if err != nil { - if os.IsNotExist(err) { - err = ErrNotRunning - } - return nil, err - } - pid, err := strconv.Atoi(string(data)) - if err != nil { - return nil, errors.New("bad process id found in pid.txt") - } - return getProcess(pid) -} - -// getProcess gets a Process from a pid and checks that the -// process is actually running. If the process -// is not running, then getProcess returns a nil -// Process and the error ErrNotRunning. -func getProcess(pid int) (*os.Process, error) { - p, err := os.FindProcess(pid) - if err != nil { - return nil, err - } - - // try to check if the process is actually running by sending - // it signal 0. - err = p.Signal(syscall.Signal(0)) - if err == nil { - return p, nil - } - if err == syscall.ESRCH { - return nil, ErrNotRunning - } - return nil, errors.New("server running but inaccessible") -} - -// Start starts the ZooKeeper server. -// It returns an error if the server is already running. -func (srv *Server) Start() error { - if err := srv.checkAvailability(); err != nil { - return err - } - p, err := srv.Process() - if err == nil || err != ErrNotRunning { - if p != nil { - p.Release() - } - return errors.New("server is already running") - } - - if _, err := os.Stat(srv.path("pid.txt")); err == nil { - // Thre pid.txt file still exists although server is not running. - // Remove it so it can be re-created. - // This leads to a race: if two processes are both - // calling Start, one might remove the file the other - // has just created, leading to a situation where - // pid.txt describes the wrong server process. - // We ignore that possibility for now. - // TODO use syscall.Flock? - if err := os.Remove(srv.path("pid.txt")); err != nil { - return fmt.Errorf("cannot remove pid.txt: %v", err) - } - } - - // Open the pid file before starting the process so that if we get two - // programs trying to concurrently start a server on the same directory - // at the same time, only one should succeed. - pidf, err := os.OpenFile(srv.path("pid.txt"), os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return fmt.Errorf("cannot create pid.txt: %v", err) - } - defer pidf.Close() - args, err := srv.command() - if err != nil { - return fmt.Errorf("cannot determine command: %v", err) - } - cmd := exec.Command(args[0], args[1:]...) - - logf, err := os.OpenFile(srv.path("log.txt"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) - if err != nil { - return fmt.Errorf("cannot create log file: %v", err) - } - defer logf.Close() - cmd.Stdout = logf - cmd.Stderr = logf - if err := cmd.Start(); err != nil { - return fmt.Errorf("cannot start server: %v", err) - } - if _, err := fmt.Fprint(pidf, cmd.Process.Pid); err != nil { - return fmt.Errorf("cannot write pid file: %v", err) - } - return nil -} - -// Stop kills the ZooKeeper server. It does nothing if it is not running. -// Note that Stop does not remove any data from the run directory, -// so Start may be called later on the same directory. -func (srv *Server) Stop() error { - p, err := srv.Process() - if p == nil { - if err != nil && err != ErrNotRunning { - return fmt.Errorf("cannot read process ID of server: %v", err) - } - return nil - } - defer p.Release() - if err := p.Kill(); err != nil { - return fmt.Errorf("cannot kill server process: %v", err) - } - // Ignore the error returned from Wait because there's little - // we can do about it - it either means that the process has just exited - // anyway or that we can't wait for it for some other reason, - // for example because it was originally started by some other process. - if _, err := p.Wait(); err != nil { - // If we can't wait for the server, it's possible that it was running - // but not as a child of this process, so the only thing we can do - // is to poll until it exits. If the process has taken longer than - // a second to exit, then it's probably not going to. - for i := 0; i < 5*4; i++ { - time.Sleep(1e9 / 4) - if np, err := getProcess(p.Pid); err != nil { - break - } else { - np.Release() - } - } - } - - if err := os.Remove(srv.path("pid.txt")); err != nil { - return fmt.Errorf("cannot remove server process ID file: %v", err) - } - return nil -} - -// Destroy stops the ZooKeeper server, and then removes its run -// directory. Warning: this will destroy all data associated with the server. -func (srv *Server) Destroy() error { - if err := srv.Stop(); err != nil { - return err - } - if err := os.RemoveAll(srv.runDir); err != nil { - return err - } - return nil -} diff --git a/vendor/launchpad.net/gozk/zookeeper/server.go b/vendor/launchpad.net/gozk/zookeeper/server.go deleted file mode 100644 index 26b7461952f..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/server.go +++ /dev/null @@ -1,250 +0,0 @@ -package zookeeper - -import ( - "bufio" - "bytes" - "errors" - "fmt" - "io/ioutil" - "net" - "os" - "path/filepath" - "strings" -) - -// Server represents a ZooKeeper server, its data and configuration files. -type Server struct { - runDir string - zkDir string -} - -// CreateServer creates the directory runDir and sets up a ZooKeeper -// server environment inside it. It is an error if runDir already -// exists and is not empty. The server will listen on the specified TCP -// port. -// -// The ZooKeeper installation directory is specified by zkDir. -// If this is empty, a system default will be used. -// -// CreateServer does not start the server. -func CreateServer(port int, runDir, zkDir string) (*Server, error) { - if err := os.Mkdir(runDir, 0777); err != nil { - if !os.IsExist(err) { - return nil, err - } - info, err := ioutil.ReadDir(runDir) - if err != nil { - return nil, err - } - if len(info) != 0 { - return nil, fmt.Errorf("server directory %q is not empty") - } - } - srv := &Server{runDir: runDir, zkDir: zkDir} - if err := srv.writeLog4JConfig(); err != nil { - return nil, err - } - if err := srv.writeZooKeeperConfig(port); err != nil { - return nil, err - } - if err := srv.writeZkDir(); err != nil { - return nil, err - } - return srv, nil -} - -// AttachServer creates a new ZooKeeper Server instance -// to operate inside an existing run directory, runDir. -// The directory must have been created with CreateServer. -func AttachServer(runDir string) (*Server, error) { - srv := &Server{runDir: runDir} - if err := srv.readZkDir(); err != nil { - return nil, fmt.Errorf("cannot read server install directory: %v", err) - } - return srv, nil -} - -func (srv *Server) checkAvailability() error { - port, err := srv.networkPort() - if err != nil { - return fmt.Errorf("cannot get network port: %v", err) - } - l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) - if err != nil { - return fmt.Errorf("cannot listen on port %v: %v", port, err) - } - l.Close() - return nil -} - -// networkPort returns the TCP port number that -// the server is configured for. -func (srv *Server) networkPort() (int, error) { - f, err := os.Open(srv.path("zoo.cfg")) - if err != nil { - return 0, err - } - r := bufio.NewReader(f) - for { - line, err := r.ReadSlice('\n') - if err != nil { - return 0, fmt.Errorf("cannot get port from %q", srv.path("zoo.cfg")) - } - var port int - if n, _ := fmt.Sscanf(string(line), "clientPort=%d\n", &port); n == 1 { - return port, nil - } - } - panic("not reached") -} - -// Addr returns a local host address that can be used -// to contact the server when it is running. -func (srv *Server) Addr() (string, error) { - port, err := srv.networkPort() - if err != nil { - return "", err - } - return fmt.Sprintf("127.0.0.1:%d", port), nil -} - -// command returns the command used to start the -// ZooKeeper server. -func (srv *Server) command() ([]string, error) { - cp, err := srv.classPath() - if err != nil { - return nil, fmt.Errorf("cannot get class path: %v", err) - } - return []string{ - "java", - "-cp", strings.Join(cp, ":"), - "-Dzookeeper.root.logger=INFO,CONSOLE", - "-Dlog4j.configuration=file:" + srv.path("log4j.properties"), - "org.apache.zookeeper.server.quorum.QuorumPeerMain", - srv.path("zoo.cfg"), - }, nil -} - -var log4jProperties = ` -log4j.rootLogger=INFO, CONSOLE -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.Threshold=INFO -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n -` - -func (srv *Server) writeLog4JConfig() (err error) { - return ioutil.WriteFile(srv.path("log4j.properties"), []byte(log4jProperties), 0666) -} - -func (srv *Server) writeZooKeeperConfig(port int) (err error) { - return ioutil.WriteFile(srv.path("zoo.cfg"), []byte(fmt.Sprintf( - "tickTime=2000\n"+ - "dataDir=%s\n"+ - "clientPort=%d\n"+ - "maxClientCnxns=500\n", - srv.runDir, port)), 0666) -} - -func (srv *Server) writeZkDir() error { - return ioutil.WriteFile(srv.path("zkdir.txt"), []byte(srv.zkDir), 0666) -} - -func (srv *Server) readZkDir() error { - data, err := ioutil.ReadFile(srv.path("zkdir.txt")) - if err != nil { - return err - } - srv.zkDir = string(data) - return nil -} - -func (srv *Server) classPath() ([]string, error) { - dir := srv.zkDir - if dir == "" { - return systemClassPath() - } - if err := checkDirectory(dir); err != nil { - return nil, err - } - // Two possibilities, as seen in zkEnv.sh: - // 1) locally built binaries (jars are in build directory) - // 2) release binaries - if build := filepath.Join(dir, "build"); checkDirectory(build) == nil { - dir = build - } - classPath, err := filepath.Glob(filepath.Join(dir, "zookeeper-*.jar")) - if err != nil { - panic(fmt.Errorf("glob for jar files: %v", err)) - } - more, err := filepath.Glob(filepath.Join(dir, "lib/*.jar")) - if err != nil { - panic(fmt.Errorf("glob for lib jar files: %v", err)) - } - - classPath = append(classPath, more...) - if len(classPath) == 0 { - return nil, fmt.Errorf("zookeeper libraries not found in %q", dir) - } - return classPath, nil -} - -const zookeeperEnviron = "/etc/zookeeper/conf/environment" - -func systemClassPath() ([]string, error) { - f, err := os.Open(zookeeperEnviron) - if f == nil { - return nil, err - } - r := bufio.NewReader(f) - for { - line, err := r.ReadSlice('\n') - if err != nil { - break - } - if !bytes.HasPrefix(line, []byte("CLASSPATH=")) { - continue - } - - // remove variable and newline - path := string(line[len("CLASSPATH=") : len(line)-1]) - - // trim white space - path = strings.Trim(path, " \t\r") - - // strip quotes - if path[0] == '"' { - path = path[1 : len(path)-1] - } - - // split on : - classPath := strings.Split(path, ":") - - // split off $ZOOCFGDIR - if len(classPath) > 0 && classPath[0] == "$ZOOCFGDIR" { - classPath = classPath[1:] - } - - if len(classPath) == 0 { - return nil, fmt.Errorf("empty class path in %q", zookeeperEnviron) - } - return classPath, nil - } - return nil, fmt.Errorf("no class path found in %q", zookeeperEnviron) -} - -// checkDirectory returns an error if the given path -// does not exist or is not a directory. -func checkDirectory(path string) error { - if info, err := os.Stat(path); err != nil || !info.IsDir() { - if err != nil { - return err - } - return &os.PathError{Op: "stat", Path: path, Err: errors.New("is not a directory")} - } - return nil -} - -func (srv *Server) path(name string) string { - return filepath.Join(srv.runDir, name) -} diff --git a/vendor/launchpad.net/gozk/zookeeper/server_test.go b/vendor/launchpad.net/gozk/zookeeper/server_test.go deleted file mode 100644 index 6ccf6b62b43..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/server_test.go +++ /dev/null @@ -1,246 +0,0 @@ -package zookeeper_test - -import ( - "bufio" - "flag" - "fmt" - . "launchpad.net/gocheck" - zk "launchpad.net/gozk/zookeeper" - "os" - "os/exec" - "strings" - "testing" - "time" -) - -var reattach = flag.Bool("zktest.reattach", false, "internal flag used for testing") -var reattachRunDir = flag.String("zktest.rundir", "", "internal flag used for testing") -var reattachAbnormalStop = flag.Bool("zktest.stop", false, "internal flag used for testing") - -// This is the reentrancy point for testing ZooKeeper servers -// started by processes that are not direct children of the -// testing process. This test always succeeds - the status -// will be written to stdout and read by indirectServer. -func TestStartNonChildServer(t *testing.T) { - if !*reattach { - // not re-entrant, so ignore this test. - return - } - err := startServer(*reattachRunDir, *reattachAbnormalStop) - if err != nil { - fmt.Printf("zktest:error:%v\n", err) - return - } - fmt.Printf("zktest:done\n") -} - -func (s *S) startServer(c *C, abort bool) { - err := startServer(s.zkTestRoot, abort) - c.Assert(err, IsNil) -} - -// startServerIndirect starts a ZooKeeper server that is not -// a direct child of the current process. If abort is true, -// the server will be started and then terminated abnormally. -func (s *S) startServerIndirect(c *C, abort bool) { - if len(os.Args) == 0 { - c.Fatal("Cannot find self executable name") - } - cmd := exec.Command( - os.Args[0], - "-zktest.reattach", - "-zktest.rundir", s.zkTestRoot, - "-zktest.stop="+fmt.Sprint(abort), - "-test.run", "StartNonChildServer", - ) - r, err := cmd.StdoutPipe() - c.Assert(err, IsNil) - defer r.Close() - cmd.Stderr = cmd.Stdout - if err := cmd.Start(); err != nil { - c.Fatalf("cannot start re-entrant gotest process: %v", err) - } - defer cmd.Wait() - bio := bufio.NewReader(r) - done := false - for { - line, err := bio.ReadSlice('\n') - if err != nil { - if !done { - c.Fatalf("indirect server status line not found: %v", err) - } - return - } - if line[len(line)-1] == '\n' { - line = line[0 : len(line)-1] - } - s := string(line) - switch { - case strings.HasPrefix(s, "zktest:error:"): - c.Fatalf("indirect server error: %s", s[len("error:"):]) - case s == "zktest:done": - done = true - default: - // Log output that doesn't match what we're expecting - it - // can be informative. - c.Logf("subcommand: %s", s) - } - } - panic("not reached") -} - -// startServer starts a ZooKeeper server, and terminates it abnormally -// if abort is true. -func startServer(runDir string, abort bool) error { - srv, err := zk.AttachServer(runDir) - if err != nil { - return fmt.Errorf("cannot attach to server at %q: %v", runDir, err) - } - if err := srv.Start(); err != nil { - return fmt.Errorf("cannot start server: %v", err) - } - if abort { - // Give it time to start up, then kill the server process abnormally, - // leaving the pid.txt file behind. - time.Sleep(0.5e9) - p, err := srv.Process() - if err != nil { - return fmt.Errorf("cannot get server process: %v", err) - } - defer p.Release() - if err := p.Kill(); err != nil { - return fmt.Errorf("cannot kill server process: %v", err) - } - } - return nil -} - -func (s *S) checkCookie(c *C) { - conn, _ := s.init(c) - cookie, _, err := conn.Get("/testAttachCookie") - c.Assert(err, IsNil) - c.Assert(cookie, Equals, "testAttachCookie") - conn.Close() -} - -// cases to test: -// child server, stopped normally; reattach, start -// non-direct child server, killed abnormally; reattach, start (->error), remove pid.txt; start -// non-direct child server, still running; reattach, start (->error), stop, start -// child server, still running; reattach, start (-> error) -// child server, still running; reattach, stop, start. -// non-direct child server, still running; reattach, stop, start. -func (s *S) TestAttachServer(c *C) { - // Create a cookie so that we know we are reattaching to the same instance. - conn, _ := s.init(c) - _, err := conn.Create("/testAttachCookie", "testAttachCookie", 0, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - s.checkCookie(c) - s.zkServer.Stop() - s.zkServer = nil - - s.testAttachServer(c, (*S).startServer) - s.testAttachServer(c, (*S).startServerIndirect) - s.testAttachServerAbnormalTerminate(c, (*S).startServer) - s.testAttachServerAbnormalTerminate(c, (*S).startServerIndirect) - - srv, err := zk.AttachServer(s.zkTestRoot) - c.Assert(err, IsNil) - - s.zkServer = srv - err = s.zkServer.Start() - c.Assert(err, IsNil) - - conn, _ = s.init(c) - err = conn.Delete("/testAttachCookie", -1) - c.Assert(err, IsNil) -} - -func (s *S) testAttachServer(c *C, start func(*S, *C, bool)) { - start(s, c, false) - - s.checkCookie(c) - - // try attaching to it while it is still running - it should fail. - srv, err := zk.AttachServer(s.zkTestRoot) - c.Assert(err, IsNil) - - err = srv.Start() - c.Assert(err, NotNil) - - // stop it and then start it again - it should succeed. - err = srv.Stop() - c.Assert(err, IsNil) - - err = srv.Start() - c.Assert(err, IsNil) - - s.checkCookie(c) - - err = srv.Stop() - c.Assert(err, IsNil) -} - -func (s *S) testAttachServerAbnormalTerminate(c *C, start func(*S, *C, bool)) { - start(s, c, true) - - // try attaching to it and starting - it should fail, because pid.txt - // won't have been removed. - srv, err := zk.AttachServer(s.zkTestRoot) - c.Assert(err, IsNil) - err = srv.Start() - c.Assert(err, NotNil) - - // stopping it should bring things back to normal. - err = srv.Stop() - c.Assert(err, IsNil) - err = srv.Start() - c.Assert(err, IsNil) - - s.checkCookie(c) - err = srv.Stop() - c.Assert(err, IsNil) -} - -func (s *S) TestCreateServer(c *C) { - dir := c.MkDir() - - zkdir := dir + "/zk" - // Check that it creates the new directory. - srv, err := zk.CreateServer(9999, zkdir, "") - c.Assert(err, IsNil) - c.Assert(srv, NotNil) - - info, err := os.Stat(zkdir) - c.Assert(err, IsNil) - c.Assert(info.IsDir(), Equals, true) - - addr, err := srv.Addr() - c.Assert(err, IsNil) - c.Assert(addr, Equals, "127.0.0.1:9999") - - // Check that it fails when called again on the non-empty directory. - _, err = zk.CreateServer(9999, zkdir, "") - c.Assert(err, ErrorMatches, `server directory .* is not empty`) - - // Check that Destroy removes the directory. - err = srv.Destroy() - c.Assert(err, IsNil) - - _, err = os.Stat(zkdir) - if !os.IsNotExist(err) { - c.Errorf("expected not-exists error, got %v", err) - } - - // Check that we can call CreateServer on the empty directory - srv, err = zk.CreateServer(8888, dir, "") - c.Assert(err, IsNil) - c.Assert(srv, NotNil) - - addr, err = srv.Addr() - c.Assert(err, IsNil) - c.Assert(addr, Equals, "127.0.0.1:8888") - - err = srv.Destroy() - c.Assert(err, IsNil) -} diff --git a/vendor/launchpad.net/gozk/zookeeper/suite_test.go b/vendor/launchpad.net/gozk/zookeeper/suite_test.go deleted file mode 100644 index 0236c397fc7..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/suite_test.go +++ /dev/null @@ -1,129 +0,0 @@ -package zookeeper_test - -import ( - "fmt" - . "launchpad.net/gocheck" - zk "launchpad.net/gozk/zookeeper" - "os" - "testing" - "time" -) - -func TestAll(t *testing.T) { - TestingT(t) -} - -var _ = Suite(&S{}) - -type S struct { - zkServer *zk.Server - zkTestRoot string - zkTestPort int - zkProcess *os.Process // The running ZooKeeper process - zkAddr string - - handles []*zk.Conn - events []*zk.Event - liveWatches int - deadWatches chan bool -} - -var logLevel = 0 //zk.LOG_ERROR - -func (s *S) init(c *C) (*zk.Conn, chan zk.Event) { - c.Logf("init dialling %q", s.zkAddr) - conn, watch, err := zk.Dial(s.zkAddr, 5e9) - c.Assert(err, IsNil) - s.handles = append(s.handles, conn) - bufferedWatch := make(chan zk.Event, 256) - - select { - case e, ok := <-watch: - c.Assert(ok, Equals, true) - c.Assert(e.Type, Equals, zk.EVENT_SESSION) - c.Assert(e.State, Equals, zk.STATE_CONNECTED) - bufferedWatch <- e - case <-time.After(5e9): - c.Fatalf("timeout dialling zookeeper addr %v", s.zkAddr) - } - - s.liveWatches += 1 - go func() { - loop: - for { - select { - case event, ok := <-watch: - if !ok { - close(bufferedWatch) - break loop - } - select { - case bufferedWatch <- event: - default: - panic("Too many events in buffered watch!") - } - } - } - s.deadWatches <- true - }() - - return conn, bufferedWatch -} - -func (s *S) SetUpTest(c *C) { - c.Assert(zk.CountPendingWatches(), Equals, 0, - Commentf("Test got a dirty watch state before running!")) - zk.SetLogLevel(logLevel) -} - -func (s *S) TearDownTest(c *C) { - // Close all handles opened in s.init(). - for _, handle := range s.handles { - handle.Close() - } - - // Wait for all the goroutines created in s.init() to terminate. - for s.liveWatches > 0 { - select { - case <-s.deadWatches: - s.liveWatches -= 1 - case <-time.After(5e9): - panic("There's a locked watch goroutine :-(") - } - } - - // Reset the list of handles. - s.handles = make([]*zk.Conn, 0) - - c.Assert(zk.CountPendingWatches(), Equals, 0, - Commentf("Test left live watches behind!")) -} - -// We use the suite set up and tear down to manage a custom ZooKeeper -// -func (s *S) SetUpSuite(c *C) { - var err error - s.deadWatches = make(chan bool) - - // N.B. We need to create a subdirectory because zk.CreateServer - // insists on creating its own directory. - - s.zkTestRoot = c.MkDir() + "/zk" - port := 21812 - s.zkAddr = fmt.Sprint("localhost:", port) - - s.zkServer, err = zk.CreateServer(port, s.zkTestRoot, "") - if err != nil { - c.Fatal("Cannot set up server environment: ", err) - } - err = s.zkServer.Start() - if err != nil { - c.Fatal("Cannot start ZooKeeper server: ", err) - } -} - -func (s *S) TearDownSuite(c *C) { - if s.zkServer != nil { - s.zkServer.Destroy() - } -} diff --git a/vendor/launchpad.net/gozk/zookeeper/zk.go b/vendor/launchpad.net/gozk/zookeeper/zk.go deleted file mode 100644 index 8f5fcfee428..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/zk.go +++ /dev/null @@ -1,1126 +0,0 @@ -// gozk - ZooKeeper support for the Go language -// -// https://wiki.ubuntu.com/gozk -// -// Copyright (c) 2010-2011 Canonical Ltd. -// -// Written by Gustavo Niemeyer -// -package zookeeper - -/* -#cgo CFLAGS: -I/usr/include/c-client-src -I/usr/include/zookeeper -#cgo LDFLAGS: -lzookeeper_mt - -#include -#include "helpers.h" -*/ -import "C" - -import ( - "fmt" - "sync" - "time" - "unsafe" -) - -// ----------------------------------------------------------------------- -// Main constants and data types. - -// Conn represents a connection to a set of ZooKeeper nodes. -type Conn struct { - watchChannels map[uintptr]chan Event - sessionWatchId uintptr - handle *C.zhandle_t - mutex sync.RWMutex -} - -// ClientId represents an established ZooKeeper session. It can be -// passed into Redial to reestablish a connection to an existing session. -type ClientId struct { - cId C.clientid_t -} - -// ACL represents one access control list element, providing the permissions -// (one of PERM_*), the scheme ("digest", etc), and the id (scheme-dependent) -// for the access control mechanism in ZooKeeper. -type ACL struct { - Perms uint32 - Scheme string - Id string -} - -// Event channels are used to provide notifications of changes in the -// ZooKeeper connection state and in specific node aspects. -// -// There are two sources of events: the session channel obtained during -// initialization with Init, and any watch channels obtained -// through one of the W-suffixed functions (GetW, ExistsW, etc). -// -// The session channel will only receive session-level events notifying -// about critical and transient changes in the ZooKeeper connection -// state (STATE_CONNECTED, STATE_EXPIRED_SESSION, etc). On long -// running applications the session channel must *necessarily* be -// observed since certain events like session expirations require an -// explicit reconnection and reestablishment of state (or bailing out). -// Because of that, the buffer used on the session channel has a limited -// size, and a panic will occur if too many events are not collected. -// -// Watch channels enable monitoring state for nodes, and the -// moment they're fired depends on which function was called to -// create them. Note that, unlike in other ZooKeeper interfaces, -// gozk will NOT dispatch unimportant session events such as -// STATE_ASSOCIATING, STATE_CONNECTING and STATE_CONNECTED to -// watch Event channels, since they are transient and disruptive -// to the workflow. Critical state changes such as expirations -// are still delivered to all event channels, though, and the -// transient events may be obsererved in the session channel. -// -// Since every watch channel may receive critical session events, events -// received must not be handled blindly as if the watch requested has -// been fired. To facilitate such tests, Events offer the Ok method, -// and they also have a good String method so they may be used as an -// os.Error value if wanted. E.g.: -// -// event := <-watch -// if !event.Ok() { -// err = event -// return -// } -// -// Note that closed channels will deliver zeroed Event, which means -// event.Type is set to EVENT_CLOSED and event.State is set to STATE_CLOSED, -// to facilitate handling. -type Event struct { - Type int // One of the EVENT_* constants. - Path string // For non-session events, the path of the watched node. - State int // One of the STATE_* constants. -} - -// Error represents a ZooKeeper error. -type Error struct { - Op string - Code ErrorCode - // SystemError holds an error if Code is ZSYSTEMERROR. - SystemError error - Path string -} - -func (e *Error) Error() string { - s := e.Code.String() - if e.Code == ZSYSTEMERROR && e.SystemError != nil { - s = e.SystemError.Error() - } - if e.Path == "" { - return fmt.Sprintf("zookeeper: %s: %v", e.Op, s) - } - return fmt.Sprintf("zookeeper: %s %q: %v", e.Op, e.Path, s) -} - -// IsError returns whether the error is a *Error -// with the given error code. -func IsError(err error, code ErrorCode) bool { - if err, _ := err.(*Error); err != nil { - return err.Code == code - } - return false -} - -// ErrorCode represents a kind of ZooKeeper error. -type ErrorCode int - -const ( - ZOK ErrorCode = C.ZOK - ZSYSTEMERROR ErrorCode = C.ZSYSTEMERROR - ZRUNTIMEINCONSISTENCY ErrorCode = C.ZRUNTIMEINCONSISTENCY - ZDATAINCONSISTENCY ErrorCode = C.ZDATAINCONSISTENCY - ZCONNECTIONLOSS ErrorCode = C.ZCONNECTIONLOSS - ZMARSHALLINGERROR ErrorCode = C.ZMARSHALLINGERROR - ZUNIMPLEMENTED ErrorCode = C.ZUNIMPLEMENTED - ZOPERATIONTIMEOUT ErrorCode = C.ZOPERATIONTIMEOUT - ZBADARGUMENTS ErrorCode = C.ZBADARGUMENTS - ZINVALIDSTATE ErrorCode = C.ZINVALIDSTATE - ZAPIERROR ErrorCode = C.ZAPIERROR - ZNONODE ErrorCode = C.ZNONODE - ZNOAUTH ErrorCode = C.ZNOAUTH - ZBADVERSION ErrorCode = C.ZBADVERSION - ZNOCHILDRENFOREPHEMERALS ErrorCode = C.ZNOCHILDRENFOREPHEMERALS - ZNODEEXISTS ErrorCode = C.ZNODEEXISTS - ZNOTEMPTY ErrorCode = C.ZNOTEMPTY - ZSESSIONEXPIRED ErrorCode = C.ZSESSIONEXPIRED - ZINVALIDCALLBACK ErrorCode = C.ZINVALIDCALLBACK - ZINVALIDACL ErrorCode = C.ZINVALIDACL - ZAUTHFAILED ErrorCode = C.ZAUTHFAILED - ZCLOSING ErrorCode = C.ZCLOSING - ZNOTHING ErrorCode = C.ZNOTHING - ZSESSIONMOVED ErrorCode = C.ZSESSIONMOVED -) - -func (code ErrorCode) String() string { - return C.GoString(C.zerror(C.int(code))) // Static, no need to free it. -} - -// zkError creates an appropriate error return from -// a ZooKeeper status and the errno return from a C API -// call. -func zkError(rc C.int, cerr error, op, path string) error { - code := ErrorCode(rc) - if code == ZOK { - return nil - } - err := &Error{ - Op: op, - Code: code, - Path: path, - } - if code == ZSYSTEMERROR { - err.SystemError = cerr - } - return err -} - -func closingError(op, path string) error { - return zkError(C.int(ZCLOSING), nil, op, path) -} - -// Constants for SetLogLevel. -const ( - LOG_ERROR = C.ZOO_LOG_LEVEL_ERROR - LOG_WARN = C.ZOO_LOG_LEVEL_WARN - LOG_INFO = C.ZOO_LOG_LEVEL_INFO - LOG_DEBUG = C.ZOO_LOG_LEVEL_DEBUG -) - -// These are defined as extern. To avoid having to declare them as -// variables here they are inlined, and correctness is ensured on -// init(). - -// Constants for Create's flags parameter. -const ( - EPHEMERAL = 1 << iota - SEQUENCE -) - -// Constants for ACL Perms. -const ( - PERM_READ = 1 << iota - PERM_WRITE - PERM_CREATE - PERM_DELETE - PERM_ADMIN - PERM_ALL = 0x1f -) - -// Constants for Event Type. -const ( - EVENT_CREATED = iota + 1 - EVENT_DELETED - EVENT_CHANGED - EVENT_CHILD - EVENT_SESSION = -1 - EVENT_NOTWATCHING = -2 - - // Doesn't really exist in zk, but handy for use in zeroed Event - // values (e.g. closed channels). - EVENT_CLOSED = 0 -) - -// Constants for Event State. -const ( - STATE_EXPIRED_SESSION = -112 - STATE_AUTH_FAILED = -113 - STATE_CONNECTING = 1 - STATE_ASSOCIATING = 2 - STATE_CONNECTED = 3 - - // Doesn't really exist in zk, but handy for use in zeroed Event - // values (e.g. closed channels). - STATE_CLOSED = 0 -) - -func init() { - if EPHEMERAL != C.ZOO_EPHEMERAL || - SEQUENCE != C.ZOO_SEQUENCE || - PERM_READ != C.ZOO_PERM_READ || - PERM_WRITE != C.ZOO_PERM_WRITE || - PERM_CREATE != C.ZOO_PERM_CREATE || - PERM_DELETE != C.ZOO_PERM_DELETE || - PERM_ADMIN != C.ZOO_PERM_ADMIN || - PERM_ALL != C.ZOO_PERM_ALL || - EVENT_CREATED != C.ZOO_CREATED_EVENT || - EVENT_DELETED != C.ZOO_DELETED_EVENT || - EVENT_CHANGED != C.ZOO_CHANGED_EVENT || - EVENT_CHILD != C.ZOO_CHILD_EVENT || - EVENT_SESSION != C.ZOO_SESSION_EVENT || - EVENT_NOTWATCHING != C.ZOO_NOTWATCHING_EVENT || - STATE_EXPIRED_SESSION != C.ZOO_EXPIRED_SESSION_STATE || - STATE_AUTH_FAILED != C.ZOO_AUTH_FAILED_STATE || - STATE_CONNECTING != C.ZOO_CONNECTING_STATE || - STATE_ASSOCIATING != C.ZOO_ASSOCIATING_STATE || - STATE_CONNECTED != C.ZOO_CONNECTED_STATE { - - panic("OOPS: Constants don't match C counterparts") - } - SetLogLevel(0) -} - -// AuthACL produces an ACL list containing a single ACL which uses -// the provided permissions, with the scheme "auth", and ID "", which -// is used by ZooKeeper to represent any authenticated user. -func AuthACL(perms uint32) []ACL { - return []ACL{{perms, "auth", ""}} -} - -// WorldACL produces an ACL list containing a single ACL which uses -// the provided permissions, with the scheme "world", and ID "anyone", -// which is used by ZooKeeper to represent any user at all. -func WorldACL(perms uint32) []ACL { - return []ACL{{perms, "world", "anyone"}} -} - -// ----------------------------------------------------------------------- -// Event methods. - -// Ok returns true in case the event reports zk as being in a usable state. -func (e Event) Ok() bool { - // That's really it for now. Anything else seems to mean zk - // can't be used at the moment. - return e.State == STATE_CONNECTED -} - -func (e Event) String() (s string) { - switch e.State { - case STATE_EXPIRED_SESSION: - s = "ZooKeeper session expired" - case STATE_AUTH_FAILED: - s = "ZooKeeper authentication failed" - case STATE_CONNECTING: - s = "ZooKeeper connecting" - case STATE_ASSOCIATING: - s = "ZooKeeper still associating" - case STATE_CONNECTED: - s = "ZooKeeper connected" - case STATE_CLOSED: - s = "ZooKeeper connection closed" - default: - s = fmt.Sprintf("unknown ZooKeeper state %d", e.State) - } - if e.Type == -1 || e.Type == EVENT_SESSION { - return - } - if s != "" { - s += "; " - } - switch e.Type { - case EVENT_CREATED: - s += "path created: " - case EVENT_DELETED: - s += "path deleted: " - case EVENT_CHANGED: - s += "path changed: " - case EVENT_CHILD: - s += "path children changed: " - case EVENT_NOTWATCHING: - s += "not watching: " // !? - case EVENT_SESSION: - // nothing - } - s += e.Path - return -} - -// ----------------------------------------------------------------------- - -// Stat contains detailed information about a node. -type Stat struct { - c C.struct_Stat -} - -// Czxid returns the zxid of the change that caused the node to be created. -func (stat *Stat) Czxid() int64 { - return int64(stat.c.czxid) -} - -// Mzxid returns the zxid of the change that last modified the node. -func (stat *Stat) Mzxid() int64 { - return int64(stat.c.mzxid) -} - -func millisec2time(ms int64) time.Time { - return time.Unix(ms/1e3, ms%1e3*1e6) -} - -// CTime returns the time (at millisecond resolution) when the node was -// created. -func (stat *Stat) CTime() time.Time { - return millisec2time(int64(stat.c.ctime)) -} - -// MTime returns the time (at millisecond resolution) when the node was -// last modified. -func (stat *Stat) MTime() time.Time { - return millisec2time(int64(stat.c.mtime)) -} - -// Version returns the number of changes to the data of the node. -func (stat *Stat) Version() int { - return int(stat.c.version) -} - -// CVersion returns the number of changes to the children of the node. -// This only changes when children are created or removed. -func (stat *Stat) CVersion() int { - return int(stat.c.cversion) -} - -// AVersion returns the number of changes to the ACL of the node. -func (stat *Stat) AVersion() int { - return int(stat.c.aversion) -} - -// If the node is an ephemeral node, EphemeralOwner returns the session id -// of the owner of the node; otherwise it will return zero. -func (stat *Stat) EphemeralOwner() int64 { - return int64(stat.c.ephemeralOwner) -} - -// DataLength returns the length of the data in the node in bytes. -func (stat *Stat) DataLength() int { - return int(stat.c.dataLength) -} - -// NumChildren returns the number of children of the node. -func (stat *Stat) NumChildren() int { - return int(stat.c.numChildren) -} - -// Pzxid returns the Pzxid of the node, whatever that is. -func (stat *Stat) Pzxid() int64 { - return int64(stat.c.pzxid) -} - -// ----------------------------------------------------------------------- -// Functions and methods related to ZooKeeper itself. - -const bufferSize = 1024 * 1024 - -// SetLogLevel changes the minimum level of logging output generated -// to adjust the amount of information provided. -func SetLogLevel(level int) { - C.zoo_set_debug_level(C.ZooLogLevel(level)) -} - -// Dial initializes the communication with a ZooKeeper cluster. The provided -// servers parameter may include multiple server addresses, separated -// by commas, so that the client will automatically attempt to connect -// to another server if one of them stops working for whatever reason. -// -// The recvTimeout parameter, given in nanoseconds, allows controlling -// the amount of time the connection can stay unresponsive before the -// server will be considered problematic. -// -// Session establishment is asynchronous, meaning that this function -// will return before the communication with ZooKeeper is fully established. -// The watch channel receives events of type SESSION_EVENT when any change -// to the state of the established connection happens. See the documentation -// for the Event type for more details. -func Dial(servers string, recvTimeout time.Duration) (*Conn, <-chan Event, error) { - return dial(servers, recvTimeout, nil) -} - -// Redial is equivalent to Dial, but attempts to reestablish an existing session -// identified via the clientId parameter. -func Redial(servers string, recvTimeout time.Duration, clientId *ClientId) (*Conn, <-chan Event, error) { - return dial(servers, recvTimeout, clientId) -} - -func dial(servers string, recvTimeout time.Duration, clientId *ClientId) (*Conn, <-chan Event, error) { - conn := &Conn{} - conn.watchChannels = make(map[uintptr]chan Event) - - var cId *C.clientid_t - if clientId != nil { - cId = &clientId.cId - } - - watchId, watchChannel := conn.createWatch(true) - conn.sessionWatchId = watchId - - cservers := C.CString(servers) - handle, cerr := C.zookeeper_init_int(cservers, C.watch_handler, C.int(recvTimeout/1e6), cId, C.ulong(watchId), 0) - C.free(unsafe.Pointer(cservers)) - if handle == nil { - conn.closeAllWatches() - return nil, nil, zkError(C.int(ZSYSTEMERROR), cerr, "dial", "") - } - conn.handle = handle - runWatchLoop() - return conn, watchChannel, nil -} - -// ClientId returns the client ID for the existing session with ZooKeeper. -// This is useful to reestablish an existing session via ReInit. -func (conn *Conn) ClientId() *ClientId { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - return &ClientId{*C.zoo_client_id(conn.handle)} -} - -// Close terminates the ZooKeeper interaction. -func (conn *Conn) Close() error { - - // Protect from concurrency around conn.handle change. - conn.mutex.Lock() - defer conn.mutex.Unlock() - - if conn.handle == nil { - // ZooKeeper may hang indefinitely if a handler is closed twice, - // so we get in the way and prevent it from happening. - return closingError("close", "") - } - rc, cerr := C.zookeeper_close(conn.handle) - - conn.closeAllWatches() - stopWatchLoop() - - // At this point, nothing else should need conn.handle. - conn.handle = nil - - return zkError(rc, cerr, "close", "") -} - -// Get returns the data and status from an existing node. err will be nil, -// unless an error is found. Attempting to retrieve data from a non-existing -// node is an error. -func (conn *Conn) Get(path string) (data string, stat *Stat, err error) { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - if conn.handle == nil { - return "", nil, closingError("get", path) - } - - cpath := C.CString(path) - cbuffer := (*C.char)(C.malloc(bufferSize)) - cbufferLen := C.int(bufferSize) - defer C.free(unsafe.Pointer(cpath)) - defer C.free(unsafe.Pointer(cbuffer)) - - var cstat Stat - rc, cerr := C.zoo_wget(conn.handle, cpath, nil, nil, cbuffer, &cbufferLen, &cstat.c) - if rc != C.ZOK { - return "", nil, zkError(rc, cerr, "get", path) - } - - result := C.GoStringN(cbuffer, cbufferLen) - return result, &cstat, nil -} - -// GetW works like Get but also returns a channel that will receive -// a single Event value when the data or existence of the given ZooKeeper -// node changes or when critical session events happen. See the -// documentation of the Event type for more details. -func (conn *Conn) GetW(path string) (data string, stat *Stat, watch <-chan Event, err error) { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - if conn.handle == nil { - return "", nil, nil, closingError("getw", path) - } - - cpath := C.CString(path) - cbuffer := (*C.char)(C.malloc(bufferSize)) - cbufferLen := C.int(bufferSize) - defer C.free(unsafe.Pointer(cpath)) - defer C.free(unsafe.Pointer(cbuffer)) - - watchId, watchChannel := conn.createWatch(true) - - var cstat Stat - rc, cerr := C.zoo_wget_int(conn.handle, cpath, C.watch_handler, C.ulong(watchId), cbuffer, &cbufferLen, &cstat.c) - if rc != C.ZOK { - conn.forgetWatch(watchId) - return "", nil, nil, zkError(rc, cerr, "getw", path) - } - - result := C.GoStringN(cbuffer, cbufferLen) - return result, &cstat, watchChannel, nil -} - -// Children returns the children list and status from an existing node. -// Attempting to retrieve the children list from a non-existent node is an error. -func (conn *Conn) Children(path string) (children []string, stat *Stat, err error) { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - if conn.handle == nil { - return nil, nil, closingError("children", path) - } - - cpath := C.CString(path) - defer C.free(unsafe.Pointer(cpath)) - - cvector := C.struct_String_vector{} - var cstat Stat - rc, cerr := C.zoo_wget_children2(conn.handle, cpath, nil, nil, &cvector, &cstat.c) - - // Can't happen if rc != 0, but avoid potential memory leaks in the future. - if cvector.count != 0 { - children = parseStringVector(&cvector) - } - if rc == C.ZOK { - stat = &cstat - } else { - err = zkError(rc, cerr, "children", path) - } - return -} - -// ChildrenW works like Children but also returns a channel that will -// receive a single Event value when a node is added or removed under the -// provided path or when critical session events happen. See the documentation -// of the Event type for more details. -func (conn *Conn) ChildrenW(path string) (children []string, stat *Stat, watch <-chan Event, err error) { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - if conn.handle == nil { - return nil, nil, nil, closingError("childrenw", path) - } - - cpath := C.CString(path) - defer C.free(unsafe.Pointer(cpath)) - - watchId, watchChannel := conn.createWatch(true) - - cvector := C.struct_String_vector{} - var cstat Stat - rc, cerr := C.zoo_wget_children2_int(conn.handle, cpath, C.watch_handler, C.ulong(watchId), &cvector, &cstat.c) - - // Can't happen if rc != 0, but avoid potential memory leaks in the future. - if cvector.count != 0 { - children = parseStringVector(&cvector) - } - if rc == C.ZOK { - stat = &cstat - watch = watchChannel - } else { - conn.forgetWatch(watchId) - err = zkError(rc, cerr, "childrenw", path) - } - return -} - -func parseStringVector(cvector *C.struct_String_vector) []string { - vector := make([]string, cvector.count) - dataStart := uintptr(unsafe.Pointer(cvector.data)) - uintptrSize := unsafe.Sizeof(dataStart) - for i := 0; i != len(vector); i++ { - cpathPos := dataStart + uintptr(i)*uintptrSize - cpath := *(**C.char)(unsafe.Pointer(cpathPos)) - vector[i] = C.GoString(cpath) - } - C.deallocate_String_vector(cvector) - return vector -} - -// Exists checks if a node exists at the given path. If it does, -// stat will contain meta information on the existing node, otherwise -// it will be nil. -func (conn *Conn) Exists(path string) (stat *Stat, err error) { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - if conn.handle == nil { - return nil, closingError("exists", path) - } - - cpath := C.CString(path) - defer C.free(unsafe.Pointer(cpath)) - - var cstat Stat - rc, cerr := C.zoo_wexists(conn.handle, cpath, nil, nil, &cstat.c) - - // We diverge a bit from the usual here: a ZNONODE is not an error - // for an exists call, otherwise every Exists call would have to check - // for err != nil and err.Code() != ZNONODE. - if rc == C.ZOK { - stat = &cstat - } else if rc != C.ZNONODE { - err = zkError(rc, cerr, "exists", path) - } - return -} - -// ExistsW works like Exists but also returns a channel that will -// receive an Event value when a node is created in case the returned -// stat is nil and the node didn't exist, or when the existing node -// is removed. It will also receive critical session events. See the -// documentation of the Event type for more details. -func (conn *Conn) ExistsW(path string) (stat *Stat, watch <-chan Event, err error) { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - if conn.handle == nil { - return nil, nil, closingError("existsw", path) - } - - cpath := C.CString(path) - defer C.free(unsafe.Pointer(cpath)) - - watchId, watchChannel := conn.createWatch(true) - - var cstat Stat - rc, cerr := C.zoo_wexists_int(conn.handle, cpath, C.watch_handler, C.ulong(watchId), &cstat.c) - - // We diverge a bit from the usual here: a ZNONODE is not an error - // for an exists call, otherwise every Exists call would have to check - // for err != nil and err.Code() != ZNONODE. - switch ErrorCode(rc) { - case ZOK: - stat = &cstat - watch = watchChannel - case ZNONODE: - watch = watchChannel - default: - conn.forgetWatch(watchId) - err = zkError(rc, cerr, "existsw", path) - } - return -} - -// Create creates a node at the given path with the given data. The -// provided flags may determine features such as whether the node is -// ephemeral or not, or whether it should have a sequence number -// attached to it, and the provided ACLs will determine who can access -// the node and under which circumstances. -// -// The returned path is useful in cases where the created path may differ -// from the requested one, such as when a sequence number is appended -// to it due to the use of the gozk.SEQUENCE flag. -func (conn *Conn) Create(path, value string, flags int, aclv []ACL) (pathCreated string, err error) { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - if conn.handle == nil { - return "", closingError("close", path) - } - - cpath := C.CString(path) - cvalue := C.CString(value) - defer C.free(unsafe.Pointer(cpath)) - defer C.free(unsafe.Pointer(cvalue)) - - caclv := buildACLVector(aclv) - defer C.deallocate_ACL_vector(caclv) - - // Allocate additional space for the sequence (10 bytes should be enough). - cpathLen := C.size_t(len(path) + 32) - cpathCreated := (*C.char)(C.malloc(cpathLen)) - defer C.free(unsafe.Pointer(cpathCreated)) - - rc, cerr := C.zoo_create(conn.handle, cpath, cvalue, C.int(len(value)), caclv, C.int(flags), cpathCreated, C.int(cpathLen)) - if rc == C.ZOK { - pathCreated = C.GoString(cpathCreated) - } else { - err = zkError(rc, cerr, "create", path) - } - return -} - -// Set modifies the data for the existing node at the given path, replacing it -// by the provided value. If version is not -1, the operation will only -// succeed if the node is still at the given version when the replacement -// happens as an atomic operation. The returned Stat value will contain -// data for the resulting node, after the operation is performed. -// -// It is an error to attempt to set the data of a non-existing node with -// this function. In these cases, use Create instead. -func (conn *Conn) Set(path, value string, version int) (stat *Stat, err error) { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - if conn.handle == nil { - return nil, closingError("set", path) - } - - cpath := C.CString(path) - cvalue := C.CString(value) - defer C.free(unsafe.Pointer(cpath)) - defer C.free(unsafe.Pointer(cvalue)) - - var cstat Stat - rc, cerr := C.zoo_set2(conn.handle, cpath, cvalue, C.int(len(value)), C.int(version), &cstat.c) - if rc == C.ZOK { - stat = &cstat - } else { - err = zkError(rc, cerr, "set", path) - } - return -} - -// Delete removes the node at path. If version is not -1, the operation -// will only succeed if the node is still at this version when the -// node is deleted as an atomic operation. -func (conn *Conn) Delete(path string, version int) (err error) { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - if conn.handle == nil { - return closingError("delete", path) - } - - cpath := C.CString(path) - defer C.free(unsafe.Pointer(cpath)) - rc, cerr := C.zoo_delete(conn.handle, cpath, C.int(version)) - return zkError(rc, cerr, "delete", path) -} - -// AddAuth adds a new authentication certificate to the ZooKeeper -// interaction. The scheme parameter will specify how to handle the -// authentication information, while the cert parameter provides the -// identity data itself. For instance, the "digest" scheme requires -// a pair like "username:password" to be provided as the certificate. -func (conn *Conn) AddAuth(scheme, cert string) error { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - if conn.handle == nil { - return closingError("addauth", "") - } - - cscheme := C.CString(scheme) - ccert := C.CString(cert) - defer C.free(unsafe.Pointer(cscheme)) - defer C.free(unsafe.Pointer(ccert)) - - data := C.create_completion_data() - if data == nil { - panic("Failed to create completion data") - } - defer C.destroy_completion_data(data) - - rc, cerr := C.zoo_add_auth(conn.handle, cscheme, ccert, C.int(len(cert)), C.handle_void_completion, unsafe.Pointer(data)) - if rc != C.ZOK { - return zkError(rc, cerr, "addauth", "") - } - - C.wait_for_completion(data) - - rc = C.int(uintptr(data.data)) - return zkError(rc, nil, "addauth", "") -} - -// ACL returns the access control list for path. -func (conn *Conn) ACL(path string) ([]ACL, *Stat, error) { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - if conn.handle == nil { - return nil, nil, closingError("acl", path) - } - - cpath := C.CString(path) - defer C.free(unsafe.Pointer(cpath)) - - caclv := C.struct_ACL_vector{} - - var cstat Stat - rc, cerr := C.zoo_get_acl(conn.handle, cpath, &caclv, &cstat.c) - if rc != C.ZOK { - return nil, nil, zkError(rc, cerr, "acl", path) - } - - aclv := parseACLVector(&caclv) - - return aclv, &cstat, nil -} - -// SetACL changes the access control list for path. -func (conn *Conn) SetACL(path string, aclv []ACL, version int) error { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - if conn.handle == nil { - return closingError("setacl", path) - } - - cpath := C.CString(path) - defer C.free(unsafe.Pointer(cpath)) - - caclv := buildACLVector(aclv) - defer C.deallocate_ACL_vector(caclv) - - rc, cerr := C.zoo_set_acl(conn.handle, cpath, C.int(version), caclv) - return zkError(rc, cerr, "setacl", path) -} - -func parseACLVector(caclv *C.struct_ACL_vector) []ACL { - structACLSize := unsafe.Sizeof(C.struct_ACL{}) - aclv := make([]ACL, caclv.count) - dataStart := uintptr(unsafe.Pointer(caclv.data)) - for i := 0; i != int(caclv.count); i++ { - caclPos := dataStart + uintptr(i)*structACLSize - cacl := (*C.struct_ACL)(unsafe.Pointer(caclPos)) - - acl := &aclv[i] - acl.Perms = uint32(cacl.perms) - acl.Scheme = C.GoString(cacl.id.scheme) - acl.Id = C.GoString(cacl.id.id) - } - C.deallocate_ACL_vector(caclv) - - return aclv -} - -func buildACLVector(aclv []ACL) *C.struct_ACL_vector { - structACLSize := unsafe.Sizeof(C.struct_ACL{}) - data := C.calloc(C.size_t(len(aclv)), C.size_t(structACLSize)) - if data == nil { - panic("ACL data allocation failed") - } - - caclv := &C.struct_ACL_vector{} - caclv.data = (*C.struct_ACL)(data) - caclv.count = C.int32_t(len(aclv)) - - dataStart := uintptr(unsafe.Pointer(caclv.data)) - for i, acl := range aclv { - caclPos := dataStart + uintptr(i)*structACLSize - cacl := (*C.struct_ACL)(unsafe.Pointer(caclPos)) - cacl.perms = C.int32_t(acl.Perms) - // C.deallocate_ACL_vector() will also handle deallocation of these. - cacl.id.scheme = C.CString(acl.Scheme) - cacl.id.id = C.CString(acl.Id) - } - - return caclv -} - -// ----------------------------------------------------------------------- -// RetryChange utility method. - -type ChangeFunc func(oldValue string, oldStat *Stat) (newValue string, err error) - -// RetryChange runs changeFunc to attempt to atomically change path -// in a lock free manner, and retries in case there was another -// concurrent change between reading and writing the node. -// -// changeFunc must work correctly if called multiple times in case -// the modification fails due to concurrent changes, and it may return -// an error that will cause the the RetryChange function to stop and -// return the same error. -// -// This mechanism is not suitable for a node that is frequently modified -// concurrently. For those cases, consider using a pessimistic locking -// mechanism. -// -// This is the detailed operation flow for RetryChange: -// -// 1. Attempt to read the node. In case the node exists, but reading it -// fails, stop and return the error found. -// -// 2. Call the changeFunc with the current node value and stat, -// or with an empty string and nil stat, if the node doesn't yet exist. -// If the changeFunc returns an error, stop and return the same error. -// -// 3. If the changeFunc returns no errors, use the string returned as -// the new candidate value for the node, and attempt to either create -// the node, if it didn't exist, or to change its contents at the specified -// version. If this procedure fails due to conflicts (concurrent changes -// in the same node), repeat from step 1. If this procedure fails with any -// other error, stop and return the error found. -// -func (conn *Conn) RetryChange(path string, flags int, acl []ACL, changeFunc ChangeFunc) error { - for { - oldValue, oldStat, err := conn.Get(path) - if err != nil && !IsError(err, ZNONODE) { - return err - } - newValue, err := changeFunc(oldValue, oldStat) - if err != nil { - return err - } - if oldStat == nil { - _, err := conn.Create(path, newValue, flags, acl) - if err == nil || !IsError(err, ZNODEEXISTS) { - return err - } - continue - } - if newValue == oldValue { - return nil // Nothing to do. - } - _, err = conn.Set(path, newValue, oldStat.Version()) - if err == nil || !IsError(err, ZBADVERSION) && !IsError(err, ZNONODE) { - return err - } - } -} - -// ----------------------------------------------------------------------- -// Watching mechanism. - -// The bridging of watches into Go is slightly tricky because Cgo doesn't -// yet provide a nice way to callback from C into a Go routine, so we do -// this by hand. That bridging works the following way: -// -// Whenever a *W method is called, it will return a channel which -// outputs Event values. Internally, a map is used to maintain references -// between an unique integer key (the watchId), and the event channel. The -// watchId is then handed to the C ZooKeeper library as the watch context, -// so that we get it back when events happen. Using an integer key as the -// watch context rather than a pointer is needed because there's no guarantee -// that in the future the GC will not move objects around, and also because -// a strong reference is needed on the Go side so that the channel is not -// garbage-collected. -// -// So, this is what's done to establish the watch. The interesting part -// lies in the other side of this logic, when events actually happen. -// -// Since Cgo doesn't allow calling back into Go, we actually fire a new -// goroutine the very first time Init is called, and allow it to block -// in a pthread condition variable within a C function. This condition -// will only be notified once a ZooKeeper watch callback appends new -// entries to the event list. When this happens, the C function returns -// and we get back into Go land with the pointer to the watch data, -// including the watchId and other event details such as type and path. - -var watchMutex sync.Mutex -var watchConns = make(map[uintptr]*Conn) -var watchCounter uintptr -var watchLoopCounter int - -// CountPendingWatches returns the number of pending watches which have -// not been fired yet, across all ZooKeeper instances. This is useful -// mostly as a debugging and testing aid. -func CountPendingWatches() int { - watchMutex.Lock() - count := len(watchConns) - watchMutex.Unlock() - return count -} - -// createWatch creates and registers a watch, returning the watch id -// and channel. -func (conn *Conn) createWatch(session bool) (watchId uintptr, watchChannel chan Event) { - buf := 1 // session/watch event - if session { - buf = 32 - } - watchChannel = make(chan Event, buf) - watchMutex.Lock() - defer watchMutex.Unlock() - watchId = watchCounter - watchCounter += 1 - conn.watchChannels[watchId] = watchChannel - watchConns[watchId] = conn - return -} - -// forgetWatch cleans resources used by watchId and prevents it -// from ever getting delivered. It shouldn't be used if there's any -// chance the watch channel is still visible and not closed, since -// it might mean a goroutine would be blocked forever. -func (conn *Conn) forgetWatch(watchId uintptr) { - watchMutex.Lock() - defer watchMutex.Unlock() - delete(conn.watchChannels, watchId) - delete(watchConns, watchId) -} - -// closeAllWatches closes all watch channels for conn. -func (conn *Conn) closeAllWatches() { - watchMutex.Lock() - defer watchMutex.Unlock() - for watchId, ch := range conn.watchChannels { - close(ch) - delete(conn.watchChannels, watchId) - delete(watchConns, watchId) - } -} - -// sendEvent delivers the event to the watchId event channel. If the -// event channel is a watch event channel, the event is delivered, -// the channel is closed, and resources are freed. -func sendEvent(watchId uintptr, event Event) { - if event.State == STATE_CLOSED { - panic("Attempted to send a CLOSED event") - } - watchMutex.Lock() - defer watchMutex.Unlock() - conn, ok := watchConns[watchId] - if !ok { - return - } - if event.Type == EVENT_SESSION && watchId != conn.sessionWatchId { - // All session events on non-session watches will be delivered - // and cause the watch to be closed early. We purposefully do - // that to enforce a simpler model that takes hiccups as - // important events that cause code to reestablish the state - // from a pristine and well known good start. - if event.State == STATE_CONNECTED { - // That means the watch was established while we were still - // connecting to zk, but we're somewhat strict about only - // dealing with watches when in a well known good state. - // Make the intent more clear by tweaking the code. - event.State = STATE_CONNECTING - } - } - ch := conn.watchChannels[watchId] - if ch == nil { - return - } - select { - case ch <- event: - default: - // Channel not available for sending, which means session - // events are necessarily involved (trivial events go - // straight to the buffer), and the application isn't paying - // attention for long enough to have the buffer filled up. - // Break down now rather than leaking forever. - if watchId == conn.sessionWatchId { - panic("Session event channel buffer is full") - } else { - panic("Watch event channel buffer is full") - } - } - if watchId != conn.sessionWatchId { - delete(conn.watchChannels, watchId) - delete(watchConns, watchId) - close(ch) - } -} - -// runWatchLoop start the event loop to collect events from the C -// library and dispatch them into Go land. Calling this function -// multiple times will only increase a counter, rather than -// getting multiple watch loops running. -func runWatchLoop() { - watchMutex.Lock() - if watchLoopCounter == 0 { - go _watchLoop() - } - watchLoopCounter += 1 - watchMutex.Unlock() -} - -// stopWatchLoop decrements the event loop counter. For the moment, -// the event loop doesn't actually stop, but some day we can easily -// implement termination of the loop if necessary. -func stopWatchLoop() { - watchMutex.Lock() - watchLoopCounter -= 1 - if watchLoopCounter == 0 { - // Not really stopping right now, so let's just - // avoid it from running again. - watchLoopCounter += 1 - } - watchMutex.Unlock() -} - -// Loop and block in a C call waiting for a watch to be fired. When -// it fires, handle the watch by dispatching it to the correct event -// channel, and go back onto waiting mode. -func _watchLoop() { - for { - // This will block until there's a watch event is available. - data := C.wait_for_watch() - event := Event{ - Type: int(data.event_type), - Path: C.GoString(data.event_path), - State: int(data.connection_state), - } - watchId := uintptr(data.watch_context) - C.destroy_watch_data(data) - sendEvent(watchId, event) - } -} diff --git a/vendor/launchpad.net/gozk/zookeeper/zk_test.go b/vendor/launchpad.net/gozk/zookeeper/zk_test.go deleted file mode 100644 index feb5850c0bd..00000000000 --- a/vendor/launchpad.net/gozk/zookeeper/zk_test.go +++ /dev/null @@ -1,710 +0,0 @@ -package zookeeper_test - -import ( - "errors" - . "launchpad.net/gocheck" - zk "launchpad.net/gozk/zookeeper" - "time" -) - -// This error will be delivered via C errno, since ZK unfortunately -// only provides the handler back from zookeeper_init(). -func (s *S) TestInitErrorThroughErrno(c *C) { - conn, watch, err := zk.Dial("bad-domain-without-port", 5e9) - if conn != nil { - conn.Close() - } - if watch != nil { - go func() { - for { - _, ok := <-watch - if !ok { - break - } - } - }() - } - c.Assert(conn, IsNil) - c.Assert(watch, IsNil) - c.Assert(err, ErrorMatches, "zookeeper: dial: invalid argument") -} - -func (s *S) TestErrorMessages(c *C) { - tests := []struct { - err zk.Error - msg string - }{{ - zk.Error{ - Op: "foo", - Code: zk.ZNONODE, - Path: "/blah", - }, - `zookeeper: foo "/blah": no node`, - }, { - zk.Error{ - Op: "foo", - Code: zk.ZNONODE, - }, - `zookeeper: foo: no node`, - }, { - zk.Error{ - Op: "foo", - Code: zk.ZSYSTEMERROR, - SystemError: errors.New("an error"), - Path: "/blah", - }, - `zookeeper: foo "/blah": an error`, - }, { - zk.Error{ - Op: "foo", - Code: zk.ZSYSTEMERROR, - Path: "/blah", - }, - `zookeeper: foo "/blah": system error`, - }} - for _, t := range tests { - c.Check(t.err.Error(), Equals, t.msg) - } -} - -func (s *S) TestRecvTimeoutInitParameter(c *C) { - conn, watch, err := zk.Dial(s.zkAddr, 0) - c.Assert(err, IsNil) - defer conn.Close() - - select { - case <-watch: - c.Fatal("Watch fired") - default: - } - - for i := 0; i != 1000; i++ { - _, _, err := conn.Get("/zookeeper") - if err != nil { - c.Check(zk.IsError(err, zk.ZOPERATIONTIMEOUT), Equals, true, Commentf("%v", err)) - c.SucceedNow() - } - } - - c.Fatal("Operation didn't timeout") -} - -func (s *S) TestSessionWatches(c *C) { - c.Assert(zk.CountPendingWatches(), Equals, 0) - - zk1, watch1 := s.init(c) - zk2, watch2 := s.init(c) - zk3, watch3 := s.init(c) - - c.Assert(zk.CountPendingWatches(), Equals, 3) - - event1 := <-watch1 - c.Assert(event1.Type, Equals, zk.EVENT_SESSION) - c.Assert(event1.State, Equals, zk.STATE_CONNECTED) - - c.Assert(zk.CountPendingWatches(), Equals, 3) - - event2 := <-watch2 - c.Assert(event2.Type, Equals, zk.EVENT_SESSION) - c.Assert(event2.State, Equals, zk.STATE_CONNECTED) - - c.Assert(zk.CountPendingWatches(), Equals, 3) - - event3 := <-watch3 - c.Assert(event3.Type, Equals, zk.EVENT_SESSION) - c.Assert(event3.State, Equals, zk.STATE_CONNECTED) - - c.Assert(zk.CountPendingWatches(), Equals, 3) - - zk1.Close() - c.Assert(zk.CountPendingWatches(), Equals, 2) - zk2.Close() - c.Assert(zk.CountPendingWatches(), Equals, 1) - zk3.Close() - c.Assert(zk.CountPendingWatches(), Equals, 0) -} - -// Gozk injects a STATE_CLOSED event when conn.Close() is called, right -// before the channel is closed. Closing the channel injects a nil -// pointer, as usual for Go, so the STATE_CLOSED gives a chance to -// know that a nil pointer is coming, and to stop the procedure. -// Hopefully this procedure will avoid some nil-pointer references by -// mistake. -func (s *S) TestClosingStateInSessionWatch(c *C) { - conn, watch := s.init(c) - - event := <-watch - c.Assert(event.Type, Equals, zk.EVENT_SESSION) - c.Assert(event.State, Equals, zk.STATE_CONNECTED) - - conn.Close() - event, ok := <-watch - c.Assert(ok, Equals, false) - c.Assert(event.Type, Equals, zk.EVENT_CLOSED) - c.Assert(event.State, Equals, zk.STATE_CLOSED) -} - -func (s *S) TestEventString(c *C) { - var event zk.Event - event = zk.Event{zk.EVENT_SESSION, "/path", zk.STATE_CONNECTED} - c.Assert(event, Matches, "ZooKeeper connected") - event = zk.Event{zk.EVENT_CREATED, "/path", zk.STATE_CONNECTED} - c.Assert(event, Matches, "ZooKeeper connected; path created: /path") - event = zk.Event{-1, "/path", zk.STATE_CLOSED} - c.Assert(event, Matches, "ZooKeeper connection closed") -} - -var okTests = []struct { - zk.Event - Ok bool -}{ - {zk.Event{zk.EVENT_SESSION, "", zk.STATE_CONNECTED}, true}, - {zk.Event{zk.EVENT_CREATED, "", zk.STATE_CONNECTED}, true}, - {zk.Event{0, "", zk.STATE_CLOSED}, false}, - {zk.Event{0, "", zk.STATE_EXPIRED_SESSION}, false}, - {zk.Event{0, "", zk.STATE_AUTH_FAILED}, false}, -} - -func (s *S) TestEventOk(c *C) { - for _, t := range okTests { - c.Assert(t.Event.Ok(), Equals, t.Ok) - } -} - -func (s *S) TestGetAndStat(c *C) { - conn, _ := s.init(c) - - data, stat, err := conn.Get("/zookeeper") - c.Assert(err, IsNil) - c.Assert(data, Equals, "") - c.Assert(stat.Czxid(), Equals, int64(0)) - c.Assert(stat.Mzxid(), Equals, int64(0)) - c.Assert(stat.CTime(), Equals, time.Unix(0, 0)) - c.Assert(stat.MTime(), Equals, time.Unix(0, 0)) - c.Assert(stat.Version(), Equals, 0) - c.Assert(stat.CVersion(), Equals, 0) - c.Assert(stat.AVersion(), Equals, 0) - c.Assert(stat.EphemeralOwner(), Equals, int64(0)) - c.Assert(stat.DataLength(), Equals, 0) - c.Assert(stat.NumChildren(), Equals, 1) - c.Assert(stat.Pzxid(), Equals, int64(0)) -} - -func (s *S) TestGetAndError(c *C) { - conn, _ := s.init(c) - - data, stat, err := conn.Get("/non-existent") - - c.Assert(data, Equals, "") - c.Assert(stat, IsNil) - c.Assert(err, ErrorMatches, `zookeeper: get "/non-existent": no node`) - c.Check(zk.IsError(err, zk.ZNONODE), Equals, true, Commentf("%v", err)) -} - -func (s *S) TestCreateAndGet(c *C) { - conn, _ := s.init(c) - - path, err := conn.Create("/test-", "bababum", zk.SEQUENCE|zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - c.Assert(path, Matches, "/test-[0-9]+") - - // Check the error condition from Create(). - _, err = conn.Create(path, "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Check(zk.IsError(err, zk.ZNODEEXISTS), Equals, true, Commentf("%v", err)) - - data, _, err := conn.Get(path) - c.Assert(err, IsNil) - c.Assert(data, Equals, "bababum") -} - -func checkTimeBetween(c *C, what string, t, t0, t1 time.Time) { - // Truncate the start time to millisecond resolution, as - // time stamps get similarly truncated. - t0 = t0.Add(-time.Duration(t0.Nanosecond() % 1e6)) - if t.Before(t0) || t.After(t1) { - c.Errorf("%s out of range; expected between %v and %v, got %v", what, t0.Format(time.StampNano), t1.Format(time.StampNano), t.Format(time.StampNano)) - } -} - -func (s *S) TestCreateSetAndGet(c *C) { - conn, _ := s.init(c) - - start := time.Now() - _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - _, stat, err := conn.Get("/test") - c.Assert(err, IsNil) - checkTimeBetween(c, "ctime", stat.CTime(), start, time.Now()) - - start = time.Now() - stat, err = conn.Set("/test", "bababum", -1) // Any version. - c.Assert(err, IsNil) - c.Assert(stat.Version(), Equals, 1) - checkTimeBetween(c, "mtime", stat.MTime(), start, time.Now()) - - data, _, err := conn.Get("/test") - c.Assert(err, IsNil) - c.Assert(data, Equals, "bababum") -} - -func (s *S) TestGetAndWatch(c *C) { - c.Check(zk.CountPendingWatches(), Equals, 0) - - conn, _ := s.init(c) - - c.Check(zk.CountPendingWatches(), Equals, 1) - - _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - data, stat, watch, err := conn.GetW("/test") - c.Assert(err, IsNil) - c.Assert(data, Equals, "one") - c.Assert(stat.Version(), Equals, 0) - - select { - case <-watch: - c.Fatal("Watch fired") - default: - } - - c.Check(zk.CountPendingWatches(), Equals, 2) - - _, err = conn.Set("/test", "two", -1) - c.Assert(err, IsNil) - - event := <-watch - c.Assert(event.Type, Equals, zk.EVENT_CHANGED) - - c.Check(zk.CountPendingWatches(), Equals, 1) - - data, _, watch, err = conn.GetW("/test") - c.Assert(err, IsNil) - c.Assert(data, Equals, "two") - - select { - case <-watch: - c.Fatal("Watch fired") - default: - } - - c.Check(zk.CountPendingWatches(), Equals, 2) - - _, err = conn.Set("/test", "three", -1) - c.Assert(err, IsNil) - - event = <-watch - c.Assert(event.Type, Equals, zk.EVENT_CHANGED) - - c.Check(zk.CountPendingWatches(), Equals, 1) -} - -func (s *S) TestGetAndWatchWithError(c *C) { - c.Check(zk.CountPendingWatches(), Equals, 0) - - conn, _ := s.init(c) - - c.Check(zk.CountPendingWatches(), Equals, 1) - - _, _, watch, err := conn.GetW("/test") - c.Assert(err, NotNil) - c.Check(zk.IsError(err, zk.ZNONODE), Equals, true, Commentf("%v", err)) - c.Assert(watch, IsNil) - - c.Check(zk.CountPendingWatches(), Equals, 1) -} - -func (s *S) TestCloseReleasesWatches(c *C) { - c.Check(zk.CountPendingWatches(), Equals, 0) - - conn, _ := s.init(c) - - c.Check(zk.CountPendingWatches(), Equals, 1) - - _, err := conn.Create("/test", "one", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - _, _, watch, err := conn.GetW("/test") - c.Assert(err, IsNil) - - c.Assert(zk.CountPendingWatches(), Equals, 2) - - conn.Close() - - c.Assert(zk.CountPendingWatches(), Equals, 0) - - select { - case _, ok := <-watch: - c.Assert(ok, Equals, false) - case <-time.After(3e9): - c.Fatal("Watch didn't fire") - } -} - -// By default, the ZooKeeper C client will hang indefinitely if a -// handler is closed twice. We get in the way and prevent it. -func (s *S) TestClosingTwiceDoesntHang(c *C) { - conn, _ := s.init(c) - err := conn.Close() - c.Assert(err, IsNil) - err = conn.Close() - c.Assert(err, NotNil) - c.Check(zk.IsError(err, zk.ZCLOSING), Equals, true, Commentf("%v", err)) -} - -func (s *S) TestChildren(c *C) { - conn, _ := s.init(c) - - children, stat, err := conn.Children("/") - c.Assert(err, IsNil) - c.Assert(children, DeepEquals, []string{"zookeeper"}) - c.Assert(stat.NumChildren(), Equals, 1) - - children, stat, err = conn.Children("/non-existent") - c.Check(zk.IsError(err, zk.ZNONODE), Equals, true, Commentf("%v", err)) - c.Assert(children, IsNil) - c.Assert(stat, IsNil) -} - -func (s *S) TestChildrenAndWatch(c *C) { - c.Check(zk.CountPendingWatches(), Equals, 0) - - conn, _ := s.init(c) - - c.Check(zk.CountPendingWatches(), Equals, 1) - - children, stat, watch, err := conn.ChildrenW("/") - c.Assert(err, IsNil) - c.Assert(children, DeepEquals, []string{"zookeeper"}) - c.Assert(stat.NumChildren(), Equals, 1) - - select { - case <-watch: - c.Fatal("Watch fired") - default: - } - - c.Check(zk.CountPendingWatches(), Equals, 2) - - _, err = conn.Create("/test1", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - event := <-watch - c.Assert(event.Type, Equals, zk.EVENT_CHILD) - c.Assert(event.Path, Equals, "/") - - c.Check(zk.CountPendingWatches(), Equals, 1) - - children, stat, watch, err = conn.ChildrenW("/") - c.Assert(err, IsNil) - c.Assert(stat.NumChildren(), Equals, 2) - - // The ordering is most likely unstable, so this test must be fixed. - c.Assert(children, DeepEquals, []string{"test1", "zookeeper"}) - - select { - case <-watch: - c.Fatal("Watch fired") - default: - } - - c.Check(zk.CountPendingWatches(), Equals, 2) - - _, err = conn.Create("/test2", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - event = <-watch - c.Assert(event.Type, Equals, zk.EVENT_CHILD) - - c.Check(zk.CountPendingWatches(), Equals, 1) -} - -func (s *S) TestChildrenAndWatchWithError(c *C) { - c.Check(zk.CountPendingWatches(), Equals, 0) - - conn, _ := s.init(c) - - c.Check(zk.CountPendingWatches(), Equals, 1) - - _, stat, watch, err := conn.ChildrenW("/test") - c.Assert(err, NotNil) - c.Check(zk.IsError(err, zk.ZNONODE), Equals, true, Commentf("%v", err)) - c.Assert(watch, IsNil) - c.Assert(stat, IsNil) - - c.Check(zk.CountPendingWatches(), Equals, 1) -} - -func (s *S) TestExists(c *C) { - conn, _ := s.init(c) - - stat, err := conn.Exists("/non-existent") - c.Assert(err, IsNil) - c.Assert(stat, IsNil) - - stat, err = conn.Exists("/zookeeper") - c.Assert(err, IsNil) - c.Assert(stat.NumChildren(), Equals, 1) -} - -func (s *S) TestExistsAndWatch(c *C) { - c.Check(zk.CountPendingWatches(), Equals, 0) - - conn, _ := s.init(c) - - c.Check(zk.CountPendingWatches(), Equals, 1) - - stat, watch, err := conn.ExistsW("/test") - c.Assert(err, IsNil) - c.Assert(stat, IsNil) - - c.Check(zk.CountPendingWatches(), Equals, 2) - - select { - case <-watch: - c.Fatal("Watch fired") - default: - } - - _, err = conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - event := <-watch - c.Assert(event.Type, Equals, zk.EVENT_CREATED) - c.Assert(event.Path, Equals, "/test") - - c.Check(zk.CountPendingWatches(), Equals, 1) - - stat, watch, err = conn.ExistsW("/test") - c.Assert(err, IsNil) - c.Assert(stat, NotNil) - c.Assert(stat.NumChildren(), Equals, 0) - - c.Check(zk.CountPendingWatches(), Equals, 2) -} - -func (s *S) TestExistsAndWatchWithError(c *C) { - c.Check(zk.CountPendingWatches(), Equals, 0) - - conn, _ := s.init(c) - - c.Check(zk.CountPendingWatches(), Equals, 1) - - stat, watch, err := conn.ExistsW("///") - c.Assert(err, NotNil) - c.Check(zk.IsError(err, zk.ZBADARGUMENTS), Equals, true, Commentf("%v", err)) - c.Assert(stat, IsNil) - c.Assert(watch, IsNil) - - c.Check(zk.CountPendingWatches(), Equals, 1) -} - -func (s *S) TestDelete(c *C) { - conn, _ := s.init(c) - - _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - err = conn.Delete("/test", 5) - c.Assert(err, NotNil) - c.Check(zk.IsError(err, zk.ZBADVERSION), Equals, true, Commentf("%v", err)) - - err = conn.Delete("/test", -1) - c.Assert(err, IsNil) - - err = conn.Delete("/test", -1) - c.Assert(err, NotNil) - c.Check(zk.IsError(err, zk.ZNONODE), Equals, true, Commentf("%v", err)) -} - -func (s *S) TestClientIdAndReInit(c *C) { - zk1, _ := s.init(c) - clientId1 := zk1.ClientId() - - zk2, _, err := zk.Redial(s.zkAddr, 5e9, clientId1) - c.Assert(err, IsNil) - defer zk2.Close() - clientId2 := zk2.ClientId() - - c.Assert(clientId1, DeepEquals, clientId2) -} - -// Surprisingly for some (including myself, initially), the watch -// returned by the exists method actually fires on data changes too. -func (s *S) TestExistsWatchOnDataChange(c *C) { - conn, _ := s.init(c) - - _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - _, watch, err := conn.ExistsW("/test") - c.Assert(err, IsNil) - - _, err = conn.Set("/test", "new", -1) - c.Assert(err, IsNil) - - event := <-watch - - c.Assert(event.Path, Equals, "/test") - c.Assert(event.Type, Equals, zk.EVENT_CHANGED) -} - -func (s *S) TestACL(c *C) { - conn, _ := s.init(c) - - _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - acl, stat, err := conn.ACL("/test") - c.Assert(err, IsNil) - c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_ALL)) - c.Assert(stat, NotNil) - c.Assert(stat.Version(), Equals, 0) - - acl, stat, err = conn.ACL("/non-existent") - c.Assert(err, NotNil) - c.Check(zk.IsError(err, zk.ZNONODE), Equals, true, Commentf("%v", err)) - c.Assert(acl, IsNil) - c.Assert(stat, IsNil) -} - -func (s *S) TestSetACL(c *C) { - conn, _ := s.init(c) - - _, err := conn.Create("/test", "", zk.EPHEMERAL, zk.WorldACL(zk.PERM_ALL)) - c.Assert(err, IsNil) - - err = conn.SetACL("/test", zk.WorldACL(zk.PERM_ALL), 5) - c.Assert(err, NotNil) - c.Check(zk.IsError(err, zk.ZBADVERSION), Equals, true, Commentf("%v", err)) - - err = conn.SetACL("/test", zk.WorldACL(zk.PERM_READ), -1) - c.Assert(err, IsNil) - - acl, _, err := conn.ACL("/test") - c.Assert(err, IsNil) - c.Assert(acl, DeepEquals, zk.WorldACL(zk.PERM_READ)) -} - -func (s *S) TestAddAuth(c *C) { - conn, _ := s.init(c) - - acl := []zk.ACL{{zk.PERM_READ, "digest", "joe:enQcM3mIEHQx7IrPNStYBc0qfs8="}} - - _, err := conn.Create("/test", "", zk.EPHEMERAL, acl) - c.Assert(err, IsNil) - - _, _, err = conn.Get("/test") - c.Assert(err, NotNil) - c.Check(zk.IsError(err, zk.ZNOAUTH), Equals, true, Commentf("%v", err)) - - err = conn.AddAuth("digest", "joe:passwd") - c.Assert(err, IsNil) - - _, _, err = conn.Get("/test") - c.Assert(err, IsNil) -} - -func (s *S) TestWatchOnReconnection(c *C) { - c.Check(zk.CountPendingWatches(), Equals, 0) - - conn, session := s.init(c) - - event := <-session - c.Assert(event.Type, Equals, zk.EVENT_SESSION) - c.Assert(event.State, Equals, zk.STATE_CONNECTED) - - c.Check(zk.CountPendingWatches(), Equals, 1) - - stat, watch, err := conn.ExistsW("/test") - c.Assert(err, IsNil) - c.Assert(stat, IsNil) - - c.Check(zk.CountPendingWatches(), Equals, 2) - - s.zkServer.Stop() - time.Sleep(2e9) - s.zkServer.Start() - - // The session channel should receive the reconnection notification. - select { - case event := <-session: - c.Assert(event.State, Equals, zk.STATE_CONNECTING) - case <-time.After(3e9): - c.Fatal("Session watch didn't fire") - } - select { - case event := <-session: - c.Assert(event.State, Equals, zk.STATE_CONNECTED) - case <-time.After(3e9): - c.Fatal("Session watch didn't fire") - } - - // The watch channel should receive just the connecting notification. - select { - case event := <-watch: - c.Assert(event.State, Equals, zk.STATE_CONNECTING) - case <-time.After(3e9): - c.Fatal("Watch didn't fire") - } - select { - case _, ok := <-watch: - c.Assert(ok, Equals, false) - case <-time.After(3e9): - c.Fatal("Watch wasn't closed") - } - - c.Check(zk.CountPendingWatches(), Equals, 1) -} - -func (s *S) TestWatchOnSessionExpiration(c *C) { - c.Check(zk.CountPendingWatches(), Equals, 0) - - conn, session := s.init(c) - - event := <-session - c.Assert(event.Type, Equals, zk.EVENT_SESSION) - c.Assert(event.State, Equals, zk.STATE_CONNECTED) - - c.Check(zk.CountPendingWatches(), Equals, 1) - - stat, watch, err := conn.ExistsW("/test") - c.Assert(err, IsNil) - c.Assert(stat, IsNil) - - c.Check(zk.CountPendingWatches(), Equals, 2) - - // Use expiration trick described in the FAQ. - clientId := conn.ClientId() - zk2, session2, err := zk.Redial(s.zkAddr, 5e9, clientId) - - for event := range session2 { - c.Log("Event from overlapping session: ", event) - if event.State == zk.STATE_CONNECTED { - // Wait for zk to process the connection. - // Not reliable without this. :-( - time.Sleep(1e9) - zk2.Close() - } - } - for event := range session { - c.Log("Event from primary session: ", event) - if event.State == zk.STATE_EXPIRED_SESSION { - break - } - } - - select { - case event := <-watch: - c.Assert(event.State, Equals, zk.STATE_CONNECTING) - case <-time.After(3e9): - c.Fatal("Watch event didn't fire") - } - - event = <-watch - c.Assert(event.Type, Equals, zk.EVENT_CLOSED) - c.Assert(event.State, Equals, zk.STATE_CLOSED) - - c.Check(zk.CountPendingWatches(), Equals, 1) -}