From 39be9370ab30ef576cb4693650ec81e0341b3ce8 Mon Sep 17 00:00:00 2001 From: Samuel Stauffer Date: Wed, 21 May 2014 17:52:33 -0700 Subject: [PATCH] Return an error for pending request on failed connect If the client fails to connect to all servers then any pending requests that have not yet been sent will return an error of ErrNoServer --- zk/conn.go | 15 +++++++++++++++ zk/zk_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/zk/conn.go b/zk/conn.go index 8f9c5605..a65c84cf 100644 --- a/zk/conn.go +++ b/zk/conn.go @@ -11,6 +11,7 @@ Possible watcher events: import ( "crypto/rand" "encoding/binary" + "errors" "fmt" "io" "log" @@ -22,6 +23,8 @@ import ( "time" ) +var ErrNoServer = errors.New("zk: could not connect to a server") + const ( bufferSize = 1536 * 1024 eventChanSize = 6 @@ -181,6 +184,7 @@ func (c *Conn) connect() { c.serverIndex = (c.serverIndex + 1) % len(c.servers) if c.serverIndex == startIndex { + c.flushUnsentRequests(ErrNoServer) time.Sleep(time.Second) } } @@ -248,6 +252,17 @@ func (c *Conn) loop() { } } +func (c *Conn) flushUnsentRequests(err error) { + for { + select { + default: + return + case req := <-c.sendChan: + req.recvChan <- response{-1, err} + } + } +} + // Send error to all pending requests and clear request map func (c *Conn) flushRequests(err error) { c.requestsLock.Lock() diff --git a/zk/zk_test.go b/zk/zk_test.go index 055001d0..718b4488 100644 --- a/zk/zk_test.go +++ b/zk/zk_test.go @@ -371,3 +371,28 @@ func TestExpiringWatch(t *testing.T) { t.Fatal("Child watcher timed out") } } + +func TestRequestFail(t *testing.T) { + // If connecting fails to all servers in the list then pending requests + // should be errored out so they don't hang forever. + + zk, _, err := Connect([]string{"127.0.0.1:32444"}, time.Second*15) + if err != nil { + t.Fatal(err) + } + defer zk.Close() + + ch := make(chan error) + go func() { + _, _, err := zk.Get("/blah") + ch <- err + }() + select { + case err := <-ch: + if err == nil { + t.Fatal("Expected non-nil error on failed request due to connection failure") + } + case <-time.After(time.Second * 2): + t.Fatal("Get hung when connection could not be made") + } +}