Skip to content

Commit

Permalink
nsqd / nsqlookupd protocol IOLoop updates
Browse files Browse the repository at this point in the history
- nsqd: update IOLoop so FatalClientErr's are returned. current
  implementation shadows err, returned err was result of
  client.Reader.ReadSlice.
- nsqlookupd: update IOLoop err shadowing in similar manner, rename
  SendResponse err to sendErr to be closer to nsqd implementation. in this
  case FatalClientErr's were effectively ignored since they would fail the
  type assertion.
- add nequal to nsqlookupd_test
- http.go / nsqd.go: resolve some err shadowing which had no impact.
  • Loading branch information
judwhite committed Oct 16, 2015
1 parent c02e25f commit 8212745
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 10 deletions.
45 changes: 45 additions & 0 deletions internal/test/fakes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package test

import (
"net"
"time"
)

type FakeNetConn struct {
ReadFunc func([]byte) (int, error)
WriteFunc func([]byte) (int, error)
CloseFunc func() error
LocalAddrFunc func() net.Addr
RemoteAddrFunc func() net.Addr
SetDeadlineFunc func(time.Time) error
SetReadDeadlineFunc func(time.Time) error
SetWriteDeadlineFunc func(time.Time) error
}

func (f FakeNetConn) Read(b []byte) (int, error) { return f.ReadFunc(b) }
func (f FakeNetConn) Write(b []byte) (int, error) { return f.WriteFunc(b) }
func (f FakeNetConn) Close() error { return f.CloseFunc() }
func (f FakeNetConn) LocalAddr() net.Addr { return f.LocalAddrFunc() }
func (f FakeNetConn) RemoteAddr() net.Addr { return f.RemoteAddrFunc() }
func (f FakeNetConn) SetDeadline(t time.Time) error { return f.SetDeadlineFunc(t) }
func (f FakeNetConn) SetReadDeadline(t time.Time) error { return f.SetReadDeadlineFunc(t) }
func (f FakeNetConn) SetWriteDeadline(t time.Time) error { return f.SetWriteDeadlineFunc(t) }

type fakeNetAddr struct{}

func (fakeNetAddr) Network() string { return "" }
func (fakeNetAddr) String() string { return "" }

func NewFakeNetConn() FakeNetConn {
netAddr := fakeNetAddr{}
return FakeNetConn{
ReadFunc: func(b []byte) (int, error) { return 0, nil },
WriteFunc: func(b []byte) (int, error) { return len(b), nil },
CloseFunc: func() error { return nil },
LocalAddrFunc: func() net.Addr { return netAddr },
RemoteAddrFunc: func() net.Addr { return netAddr },
SetDeadlineFunc: func(time.Time) error { return nil },
SetWriteDeadlineFunc: func(time.Time) error { return nil },
SetReadDeadlineFunc: func(time.Time) error { return nil },
}
}
6 changes: 4 additions & 2 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout

var deferred time.Duration
if ds, ok := reqParams["defer"]; ok {
di, err := strconv.ParseInt(ds[0], 10, 64)
var di int64
di, err = strconv.ParseInt(ds[0], 10, 64)
if err != nil {
return nil, http_api.Err{400, "INVALID_DEFER"}
}
Expand Down Expand Up @@ -264,7 +265,8 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou
rdr := bufio.NewReader(io.LimitReader(req.Body, readMax))
total := 0
for !exit {
block, err := rdr.ReadBytes('\n')
var block []byte
block, err = rdr.ReadBytes('\n')
if err != nil {
if err != io.EOF {
return nil, http_api.Err{500, "INTERNAL_ERROR"}
Expand Down
3 changes: 2 additions & 1 deletion nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func New(opts *Options) *NSQD {
}

if opts.StatsdPrefix != "" {
_, port, err := net.SplitHostPort(opts.HTTPAddress)
var port string
_, port, err = net.SplitHostPort(opts.HTTPAddress)
if err != nil {
n.logf("ERROR: failed to parse HTTP address (%s) - %s", opts.HTTPAddress, err)
os.Exit(1)
Expand Down
4 changes: 3 additions & 1 deletion nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func (p *protocolV2) IOLoop(conn net.Conn) error {
p.ctx.nsqd.logf("PROTOCOL(V2): [%s] %s", client, params)
}

response, err := p.Exec(client, params)
var response []byte
response, err = p.Exec(client, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
Expand All @@ -92,6 +93,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error {

sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx)
break
}

Expand Down
43 changes: 41 additions & 2 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"compress/flate"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/mreiferson/go-snappystream"
"github.com/nsqio/go-nsq"
"github.com/nsqio/nsq/internal/protocol"
"github.com/nsqio/nsq/internal/test"
)

func mustStartNSQD(opts *Options) (*net.TCPAddr, *net.TCPAddr, *NSQD) {
Expand Down Expand Up @@ -182,8 +184,8 @@ func TestMultipleConsumerV2(t *testing.T) {
go func(c net.Conn) {
resp, _ := nsq.ReadResponse(c)
_, data, _ := nsq.UnpackResponse(resp)
msg, _ := decodeMessage(data)
msgChan <- msg
recvdMsg, _ := decodeMessage(data)
msgChan <- recvdMsg
}(conn)
}

Expand Down Expand Up @@ -1452,6 +1454,43 @@ func runAuthTest(t *testing.T, authResponse, authSecret, authError, authSuccess

}

func TestIOLoopReturnsClientErrWhenSendFails(t *testing.T) {
fakeConn := test.NewFakeNetConn()
fakeConn.WriteFunc = func(b []byte) (int, error) {
return 0, errors.New("write error")
}

testIOLoopReturnsClientErr(t, fakeConn)
}

func TestIOLoopReturnsClientErrWhenSendSucceeds(t *testing.T) {
fakeConn := test.NewFakeNetConn()
fakeConn.WriteFunc = func(b []byte) (int, error) {
return len(b), nil
}

testIOLoopReturnsClientErr(t, fakeConn)
}

func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) {
fakeConn.ReadFunc = func(b []byte) (int, error) {
return copy(b, []byte("INVALID_COMMAND\n")), nil
}

opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.Verbose = true

prot := &protocolV2{ctx: &context{nsqd: New(opts)}}
defer prot.ctx.nsqd.Exit()

err := prot.IOLoop(fakeConn)

nequal(t, err, nil)
equal(t, err.Error(), "E_INVALID invalid command INVALID_COMMAND")
nequal(t, err.(*protocol.FatalClientErr), nil)
}

func BenchmarkProtocolV2Exec(b *testing.B) {
b.StopTimer()
opts := NewOptions()
Expand Down
9 changes: 5 additions & 4 deletions nsqlookupd/lookup_protocol_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
var line string

client := NewClientV1(conn)
err = nil
reader := bufio.NewReader(client)
for {
line, err = reader.ReadString('\n')
Expand All @@ -37,16 +36,18 @@ func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
line = strings.TrimSpace(line)
params := strings.Split(line, " ")

response, err := p.Exec(client, reader, params)
var response []byte
response, err = p.Exec(client, reader, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, err, ctx)

_, err = protocol.SendResponse(client, []byte(err.Error()))
if err != nil {
_, sendErr := protocol.SendResponse(client, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx)
break
}

Expand Down
62 changes: 62 additions & 0 deletions nsqlookupd/lookup_protocol_v1_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package nsqlookupd

import (
"errors"
"testing"
"time"

"github.com/nsqio/nsq/internal/protocol"
"github.com/nsqio/nsq/internal/test"
)

func TestIOLoopReturnsClientErrWhenSendFails(t *testing.T) {
fakeConn := test.NewFakeNetConn()
fakeConn.WriteFunc = func(b []byte) (int, error) {
return 0, errors.New("write error")
}

testIOLoopReturnsClientErr(t, fakeConn)
}

func TestIOLoopReturnsClientErrWhenSendSucceeds(t *testing.T) {
fakeConn := test.NewFakeNetConn()
fakeConn.WriteFunc = func(b []byte) (int, error) {
return len(b), nil
}

testIOLoopReturnsClientErr(t, fakeConn)
}

func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) {
fakeConn.ReadFunc = func(b []byte) (int, error) {
return copy(b, []byte("INVALID_COMMAND\n")), nil
}

opts := NewOptions()
opts.Logger = newTestLogger(t)
opts.Verbose = true

prot := &LookupProtocolV1{ctx: &Context{nsqlookupd: New(opts)}}

errChan := make(chan error)
test := func() {
errChan <- prot.IOLoop(fakeConn)
defer prot.ctx.nsqlookupd.Exit()
}
go test()

var err error
var timeout bool

select {
case err = <-errChan:
case <-time.After(2 * time.Second):
timeout = true
}

equal(t, timeout, false)

nequal(t, err, nil)
equal(t, err.Error(), "E_INVALID invalid command INVALID_COMMAND")
nequal(t, err.(*protocol.FatalClientErr), nil)
}
9 changes: 9 additions & 0 deletions nsqlookupd/nsqlookupd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ func equal(t *testing.T, act, exp interface{}) {
}
}

func nequal(t *testing.T, act, exp interface{}) {
if reflect.DeepEqual(exp, act) {
_, file, line, _ := runtime.Caller(1)
t.Logf("\033[31m%s:%d:\n\n\tnexp: %#v\n\n\tgot: %#v\033[39m\n\n",
filepath.Base(file), line, exp, act)
t.FailNow()
}
}

type tbLog interface {
Log(...interface{})
}
Expand Down

0 comments on commit 8212745

Please sign in to comment.