Skip to content

Commit

Permalink
Notification support (#18)
Browse files Browse the repository at this point in the history
* changed scanning to filter notify messages
* readded multiline support
* added support for notifications
* fixed issues and suggestions
* clarify DecodeResponse error assertions
* added notification buffer and removed debug print
* fixed data race
* added keepalive option
* only set connected to false if it's true (might fix data-race?)
* added mutex
* updated keepalive for the v3.3+ server releases
* improved timeout
* added option to set the notification buffer size
* made notification write non-blocking
* handle connection status threadsafe
* renamed register methods
* exported NotifyEvent type and fixed typo
* removed redundant if
* renamed NotifyEvent to NotifyCategory
* fixed NotifyCategory comment
* added mutex to Keepalive
* added option to set a custom Keepalive interval
* avoid recreating notify channel
* cleaned up NotificationBuffer option
* restructured keepalive and command handling
* use fmt.Stringer
* change work type to string
* removed mutex and improved timeout
* added tests
* reduced timeout in tests to improve speed
* clarified comment, inlined if
* reverted to previous timeout
  • Loading branch information
irgendwr authored and stevenh committed Feb 6, 2019
1 parent 2b2f783 commit 78271a6
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 46 deletions.
178 changes: 141 additions & 37 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,32 @@ var (
respTrailerRe = regexp.MustCompile(`^error id=(\d+) msg=([^ ]+)(.*)`)

// DefaultTimeout is the default read / write / dial timeout for Clients.
DefaultTimeout = time.Second * 10
DefaultTimeout = 10 * time.Second

// DefaultKeepAlive is the default interval in which keepalive data is sent.
DefaultKeepAlive = 200 * time.Second

// DefaultNotifyBufSize is the default notification buffer size.
DefaultNotifyBufSize = 5

// keepAliveData is the keepalive data.
keepAliveData = " \n"
)

// Client is a TeamSpeak 3 ServerQuery client.
type Client struct {
conn net.Conn
timeout time.Duration
scanner *bufio.Scanner
buf []byte
maxBufSize int
conn net.Conn
timeout time.Duration
keepAlive time.Duration
scanner *bufio.Scanner
buf []byte
maxBufSize int
notifyBufSize int
work chan string
err chan error
notify chan Notification
disconnect chan struct{}
res []string

Server *ServerMethods
}
Expand All @@ -53,6 +69,22 @@ func Timeout(timeout time.Duration) func(*Client) error {
}
}

// KeepAlive sets the keepAlive interval.
func KeepAlive(keepAlive time.Duration) func(*Client) error {
return func(c *Client) error {
c.keepAlive = keepAlive
return nil
}
}

// NotificationBuffer sets the notification buffer size.
func NotificationBuffer(size int) func(*Client) error {
return func(c *Client) error {
c.notifyBufSize = size
return nil
}
}

// Buffer sets the initial buffer used to parse responses from
// the server and the maximum size of buffer that may be allocated.
// The maximum parsable token size is the larger of max and cap(buf).
Expand All @@ -76,9 +108,14 @@ func NewClient(addr string, options ...func(c *Client) error) (*Client, error) {
}

c := &Client{
timeout: DefaultTimeout,
buf: make([]byte, startBufSize),
maxBufSize: MaxParseTokenSize,
timeout: DefaultTimeout,
keepAlive: DefaultKeepAlive,
buf: make([]byte, startBufSize),
maxBufSize: MaxParseTokenSize,
notifyBufSize: DefaultNotifyBufSize,
work: make(chan string),
err: make(chan error),
disconnect: make(chan struct{}),
}
for _, f := range options {
if f == nil {
Expand All @@ -89,6 +126,8 @@ func NewClient(addr string, options ...func(c *Client) error) (*Client, error) {
}
}

c.notify = make(chan Notification, c.notifyBufSize)

// Wire up command groups
c.Server = &ServerMethods{Client: c}

Expand All @@ -101,11 +140,11 @@ func NewClient(addr string, options ...func(c *Client) error) (*Client, error) {
c.scanner.Buffer(c.buf, c.maxBufSize)
c.scanner.Split(ScanLines)

if err := c.setDeadline(); err != nil {
if err := c.conn.SetDeadline(time.Now().Add(c.timeout)); err != nil {
return nil, err
}

// Reader the connection header
// Read the connection header
if !c.scanner.Scan() {
return nil, c.scanErr()
}
Expand All @@ -119,12 +158,69 @@ func NewClient(addr string, options ...func(c *Client) error) (*Client, error) {
return nil, c.scanErr()
}

if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
return nil, err
}

// Start handlers
go c.messageHandler()
go c.workHandler()

return c, nil
}

// setDeadline updates the deadline on the connection based on the clients configured timeout.
func (c *Client) setDeadline() error {
return c.conn.SetDeadline(time.Now().Add(c.timeout))
// messageHandler scans incoming lines and handles them accordingly.
func (c *Client) messageHandler() {
for {
if c.scanner.Scan() {
line := c.scanner.Text()
if line == "error id=0 msg=ok" {
c.err <- nil
} else if matches := respTrailerRe.FindStringSubmatch(line); len(matches) == 4 {
c.err <- NewError(matches)
} else if strings.Index(line, "notify") == 0 {
if n, err := decodeNotification(line); err == nil {
// non-blocking write
select {
case c.notify <- n:
default:
}
}
} else {
c.res = append(c.res, line)
}
} else {
err := c.scanErr()
c.err <- err
if err == io.ErrUnexpectedEOF {
close(c.disconnect)
return
}
}
}
}

// workHandler handles commands and keepAlive messages.
func (c *Client) workHandler() {
for {
select {
case w := <-c.work:
c.process(w)
case <-time.After(c.keepAlive):
c.process(keepAliveData)
case <-c.disconnect:
return
}
}
}

func (c *Client) process(data string) {
if err := c.conn.SetWriteDeadline(time.Now().Add(c.timeout)); err != nil {
c.err <- err
}
if _, err := c.conn.Write([]byte(data)); err != nil {
c.err <- err
}
}

// Exec executes cmd on the server and returns the response.
Expand All @@ -134,42 +230,50 @@ func (c *Client) Exec(cmd string) ([]string, error) {

// ExecCmd executes cmd on the server and returns the response.
func (c *Client) ExecCmd(cmd *Cmd) ([]string, error) {
if err := c.setDeadline(); err != nil {
return nil, err
if !c.IsConnected() {
return nil, ErrNotConnected
}

if _, err := c.conn.Write([]byte(cmd.String())); err != nil {
return nil, err
}
c.work <- cmd.String()

if err := c.setDeadline(); err != nil {
return nil, err
select {
case err := <-c.err:
if err != nil {
return nil, err
}
case <-time.After(c.timeout):
return nil, ErrTimeout
}

lines := make([]string, 0, 10)
for c.scanner.Scan() {
l := c.scanner.Text()
if l == "error id=0 msg=ok" {
if cmd.response != nil {
if err := DecodeResponse(lines, cmd.response); err != nil {
return nil, err
}
}
return lines, nil
} else if matches := respTrailerRe.FindStringSubmatch(l); len(matches) == 4 {
return nil, NewError(matches)
} else {
lines = append(lines, l)
res := c.res
c.res = nil

if cmd.response != nil {
if err := DecodeResponse(res, cmd.response); err != nil {
return nil, err
}
}

return nil, c.scanErr()
return res, nil
}

// IsConnected returns whether the client is connected.
func (c *Client) IsConnected() bool {
select {
case <-c.disconnect:
return false
default:
return true
}
}

// Close closes the connection to the server.
func (c *Client) Close() error {
defer close(c.notify)

_, err := c.Exec("quit")
err2 := c.conn.Close()

if err != nil {
return err
}
Expand All @@ -178,7 +282,7 @@ func (c *Client) Close() error {
}

// scanError returns the error from the scanner if non-nil,
// io.ErrUnexpectedEOF otherwise.
// `io.ErrUnexpectedEOF` otherwise.
func (c *Client) scanErr() error {
if err := c.scanner.Err(); err != nil {
return err
Expand Down
53 changes: 48 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestClient(t *testing.T) {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Second*2))
c, err := NewClient(s.Addr, Timeout(time.Second))
if !assert.NoError(t, err) {
return
}
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestClientDisconnect(t *testing.T) {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Second*2))
c, err := NewClient(s.Addr, Timeout(time.Second))
if !assert.NoError(t, err) {
return
}
Expand All @@ -88,7 +88,7 @@ func TestClientWriteFail(t *testing.T) {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Second*2))
c, err := NewClient(s.Addr, Timeout(time.Second))
if !assert.NoError(t, err) {
return
}
Expand All @@ -108,6 +108,49 @@ func TestClientDialFail(t *testing.T) {
assert.NoError(t, c.Close())
}

func TestClientTimeout(t *testing.T) {
s := newServer(t)
if s == nil {
return
}
defer func() {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Millisecond*100))
if !assert.NoError(t, err) {
return
}

// Not receiving a response must cause a timeout
_, err = c.Exec(" ")
assert.Error(t, err)
}

func TestClientDeadline(t *testing.T) {
s := newServer(t)
if s == nil {
return
}
defer func() {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Millisecond*100))
if !assert.NoError(t, err) {
return
}

_, err = c.Exec("version")
assert.NoError(t, err)

// Inactivity must not cause a timeout
time.Sleep(c.timeout * 2)

_, err = c.Exec("version")
assert.NoError(t, err)
}

func TestClientNoHeader(t *testing.T) {
s := newServerStopped(t)
if s == nil {
Expand All @@ -119,7 +162,7 @@ func TestClientNoHeader(t *testing.T) {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Second))
c, err := NewClient(s.Addr, Timeout(time.Millisecond*100))
if assert.Error(t, err) {
return
}
Expand All @@ -139,7 +182,7 @@ func TestClientNoBanner(t *testing.T) {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Second))
c, err := NewClient(s.Addr, Timeout(time.Millisecond*100))
if assert.Error(t, err) {
return
}
Expand Down
10 changes: 9 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ import (
)

var (
// ErrInvalidConnectHeader is returned by NewClient if the server doesn't respond with the required connection header.
// ErrInvalidConnectHeader is returned by NewClient if the server
// doesn't respond with the required connection header.
ErrInvalidConnectHeader = errors.New("invalid connect header")

// ErrNilOption is returned by NewClient if an option is nil.
ErrNilOption = errors.New("nil option")

// ErrNotConnected is returned by Exec and ExecCmd if the client is not connected.
ErrNotConnected = errors.New("not connected")

// ErrTimeout is returned by Exec and ExecCmd if no response is received
// within the specified timeout duration.
ErrTimeout = errors.New("timeout")
)

// Error represents a error returned from the TeamSpeak 3 server.
Expand Down
4 changes: 3 additions & 1 deletion helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ func Decode(str string) string {

// DecodeResponse decodes a response into a struct.
func DecodeResponse(lines []string, v interface{}) error {
if len(lines) != 1 {
if len(lines) > 1 {
return NewInvalidResponseError("too many lines", lines)
} else if len(lines) == 0 {
return NewInvalidResponseError("no lines", lines)
}

input := make(map[string]interface{})
Expand Down
Loading

0 comments on commit 78271a6

Please sign in to comment.