Skip to content

Notification support #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c627eab
changed scanning to filter notify messages
irgendwr Jul 11, 2018
42ea2ef
readded multiline support
irgendwr Jul 11, 2018
eb958eb
added support for notifications
irgendwr Jul 11, 2018
48b585f
fixed issues and suggestions
irgendwr Jul 13, 2018
a2b28f7
clarify DecodeResponse error assertions
irgendwr Jul 14, 2018
6c55d63
added notification buffer and removed debug print
irgendwr Jul 14, 2018
8a8634b
fixed data race
irgendwr Jul 14, 2018
adcd4eb
added keepalive option
irgendwr Aug 13, 2018
778110a
only set connected to false if it's true (might fix data-race?)
irgendwr Aug 26, 2018
3a92186
added mutex
irgendwr Aug 27, 2018
7032b17
updated keepalive for the v3.3+ server releases
irgendwr Aug 28, 2018
6c6af09
improved timeout
irgendwr Aug 29, 2018
54ee219
added option to set the notification buffer size
irgendwr Oct 4, 2018
91a7399
made notification write non-blocking
irgendwr Oct 4, 2018
79a6b09
handle connection status threadsafe
irgendwr Oct 4, 2018
f2b1722
renamed register methods
irgendwr Oct 4, 2018
4b93b4d
exported NotifyEvent type and fixed typo
irgendwr Oct 4, 2018
97c0f92
removed redundant if
irgendwr Oct 4, 2018
d5157f0
renamed NotifyEvent to NotifyCategory
irgendwr Oct 4, 2018
6ea7e0f
fixed NotifyCategory comment
irgendwr Oct 4, 2018
56617d2
added mutex to Keepalive
irgendwr Oct 5, 2018
39ffab3
added option to set a custom Keepalive interval
irgendwr Oct 5, 2018
e7bb356
avoid recreating notify channel
irgendwr Oct 5, 2018
f6e7cb5
cleaned up NotificationBuffer option
irgendwr Oct 5, 2018
af6af24
restructured keepalive and command handling
irgendwr Oct 6, 2018
b5074a9
use fmt.Stringer
irgendwr Oct 6, 2018
b217030
change work type to string
irgendwr Oct 6, 2018
4b22361
removed mutex and improved timeout
irgendwr Oct 7, 2018
665a438
added tests
irgendwr Oct 7, 2018
c56fcd0
reduced timeout in tests to improve speed
irgendwr Oct 7, 2018
bbd7801
clarified comment, inlined if
irgendwr Oct 8, 2018
22afe59
reverted to previous timeout
irgendwr Oct 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 141 additions & 37 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,32 @@ var (
respTrailerRe = regexp.MustCompile(`^error id=(\d+) msg=([^ ]+)(.*)`)

// DefaultTimeout is the default read / write / dial timeout for Clients.
DefaultTimeout = time.Second * 10
DefaultTimeout = 10 * time.Second

// DefaultKeepAlive is the default interval in which keepalive data is sent.
DefaultKeepAlive = 200 * time.Second

// DefaultNotifyBufSize is the default notification buffer size.
DefaultNotifyBufSize = 5

// keepAliveData is the keepalive data.
keepAliveData = " \n"
)

// Client is a TeamSpeak 3 ServerQuery client.
type Client struct {
conn net.Conn
timeout time.Duration
scanner *bufio.Scanner
buf []byte
maxBufSize int
conn net.Conn
timeout time.Duration
keepAlive time.Duration
scanner *bufio.Scanner
buf []byte
maxBufSize int
notifyBufSize int
work chan string
err chan error
notify chan Notification
disconnect chan struct{}
res []string

Server *ServerMethods
}
Expand All @@ -53,6 +69,22 @@ func Timeout(timeout time.Duration) func(*Client) error {
}
}

// KeepAlive sets the keepAlive interval.
func KeepAlive(keepAlive time.Duration) func(*Client) error {
return func(c *Client) error {
c.keepAlive = keepAlive
return nil
}
}

// NotificationBuffer sets the notification buffer size.
func NotificationBuffer(size int) func(*Client) error {
return func(c *Client) error {
c.notifyBufSize = size
return nil
}
}

// Buffer sets the initial buffer used to parse responses from
// the server and the maximum size of buffer that may be allocated.
// The maximum parsable token size is the larger of max and cap(buf).
Expand All @@ -76,9 +108,14 @@ func NewClient(addr string, options ...func(c *Client) error) (*Client, error) {
}

c := &Client{
timeout: DefaultTimeout,
buf: make([]byte, startBufSize),
maxBufSize: MaxParseTokenSize,
timeout: DefaultTimeout,
keepAlive: DefaultKeepAlive,
buf: make([]byte, startBufSize),
maxBufSize: MaxParseTokenSize,
notifyBufSize: DefaultNotifyBufSize,
work: make(chan string),
err: make(chan error),
disconnect: make(chan struct{}),
}
for _, f := range options {
if f == nil {
Expand All @@ -89,6 +126,8 @@ func NewClient(addr string, options ...func(c *Client) error) (*Client, error) {
}
}

c.notify = make(chan Notification, c.notifyBufSize)

// Wire up command groups
c.Server = &ServerMethods{Client: c}

Expand All @@ -101,11 +140,11 @@ func NewClient(addr string, options ...func(c *Client) error) (*Client, error) {
c.scanner.Buffer(c.buf, c.maxBufSize)
c.scanner.Split(ScanLines)

if err := c.setDeadline(); err != nil {
if err := c.conn.SetDeadline(time.Now().Add(c.timeout)); err != nil {
return nil, err
}

// Reader the connection header
// Read the connection header
if !c.scanner.Scan() {
return nil, c.scanErr()
}
Expand All @@ -119,12 +158,69 @@ func NewClient(addr string, options ...func(c *Client) error) (*Client, error) {
return nil, c.scanErr()
}

if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
return nil, err
}

// Start handlers
go c.messageHandler()
go c.workHandler()

return c, nil
}

// setDeadline updates the deadline on the connection based on the clients configured timeout.
func (c *Client) setDeadline() error {
return c.conn.SetDeadline(time.Now().Add(c.timeout))
// messageHandler scans incoming lines and handles them accordingly.
func (c *Client) messageHandler() {
for {
if c.scanner.Scan() {
line := c.scanner.Text()
if line == "error id=0 msg=ok" {
c.err <- nil
} else if matches := respTrailerRe.FindStringSubmatch(line); len(matches) == 4 {
c.err <- NewError(matches)
} else if strings.Index(line, "notify") == 0 {
if n, err := decodeNotification(line); err == nil {
// non-blocking write
select {
case c.notify <- n:
default:
}
}
} else {
c.res = append(c.res, line)
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about time errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean. Timeout errors? Any errors are returned with c.err <- err

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but as you're always reading, in order to facilitate notifications, and every read should have a timeout (not just the ones triggered by writes) then you need to deal with the expected timeouts otherwise the client would get a constant stream of errors even when they didn't send any commands.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why should the read in the message handler have a timeout? This would just lead to constant timeout errors and serve no purpose imho.
Otherwise I could set a deadline and and just ignore all of the errors caused by timeouts.

err := c.scanErr()
c.err <- err
if err == io.ErrUnexpectedEOF {
close(c.disconnect)
return
}
}
}
}

// workHandler handles commands and keepAlive messages.
func (c *Client) workHandler() {
for {
select {
case w := <-c.work:
c.process(w)
case <-time.After(c.keepAlive):
c.process(keepAliveData)
case <-c.disconnect:
return
}
}
}

func (c *Client) process(data string) {
if err := c.conn.SetWriteDeadline(time.Now().Add(c.timeout)); err != nil {
c.err <- err
}
if _, err := c.conn.Write([]byte(data)); err != nil {
c.err <- err
}
}

// Exec executes cmd on the server and returns the response.
Expand All @@ -134,42 +230,50 @@ func (c *Client) Exec(cmd string) ([]string, error) {

// ExecCmd executes cmd on the server and returns the response.
func (c *Client) ExecCmd(cmd *Cmd) ([]string, error) {
if err := c.setDeadline(); err != nil {
return nil, err
if !c.IsConnected() {
return nil, ErrNotConnected
}

if _, err := c.conn.Write([]byte(cmd.String())); err != nil {
return nil, err
}
c.work <- cmd.String()

if err := c.setDeadline(); err != nil {
return nil, err
select {
case err := <-c.err:
if err != nil {
return nil, err
}
case <-time.After(c.timeout):
return nil, ErrTimeout
}

lines := make([]string, 0, 10)
for c.scanner.Scan() {
l := c.scanner.Text()
if l == "error id=0 msg=ok" {
if cmd.response != nil {
if err := DecodeResponse(lines, cmd.response); err != nil {
return nil, err
}
}
return lines, nil
} else if matches := respTrailerRe.FindStringSubmatch(l); len(matches) == 4 {
return nil, NewError(matches)
} else {
lines = append(lines, l)
res := c.res
c.res = nil

if cmd.response != nil {
if err := DecodeResponse(res, cmd.response); err != nil {
return nil, err
}
}

return nil, c.scanErr()
return res, nil
}

// IsConnected returns whether the client is connected.
func (c *Client) IsConnected() bool {
select {
case <-c.disconnect:
return false
default:
return true
}
}

// Close closes the connection to the server.
func (c *Client) Close() error {
defer close(c.notify)

_, err := c.Exec("quit")
err2 := c.conn.Close()

if err != nil {
return err
}
Expand All @@ -178,7 +282,7 @@ func (c *Client) Close() error {
}

// scanError returns the error from the scanner if non-nil,
// io.ErrUnexpectedEOF otherwise.
// `io.ErrUnexpectedEOF` otherwise.
func (c *Client) scanErr() error {
if err := c.scanner.Err(); err != nil {
return err
Expand Down
53 changes: 48 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestClient(t *testing.T) {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Second*2))
c, err := NewClient(s.Addr, Timeout(time.Second))
if !assert.NoError(t, err) {
return
}
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestClientDisconnect(t *testing.T) {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Second*2))
c, err := NewClient(s.Addr, Timeout(time.Second))
if !assert.NoError(t, err) {
return
}
Expand All @@ -88,7 +88,7 @@ func TestClientWriteFail(t *testing.T) {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Second*2))
c, err := NewClient(s.Addr, Timeout(time.Second))
if !assert.NoError(t, err) {
return
}
Expand All @@ -108,6 +108,49 @@ func TestClientDialFail(t *testing.T) {
assert.NoError(t, c.Close())
}

func TestClientTimeout(t *testing.T) {
s := newServer(t)
if s == nil {
return
}
defer func() {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Millisecond*100))
if !assert.NoError(t, err) {
return
}

// Not receiving a response must cause a timeout
_, err = c.Exec(" ")
assert.Error(t, err)
}

func TestClientDeadline(t *testing.T) {
s := newServer(t)
if s == nil {
return
}
defer func() {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Millisecond*100))
if !assert.NoError(t, err) {
return
}

_, err = c.Exec("version")
assert.NoError(t, err)

// Inactivity must not cause a timeout
time.Sleep(c.timeout * 2)

_, err = c.Exec("version")
assert.NoError(t, err)
}

func TestClientNoHeader(t *testing.T) {
s := newServerStopped(t)
if s == nil {
Expand All @@ -119,7 +162,7 @@ func TestClientNoHeader(t *testing.T) {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Second))
c, err := NewClient(s.Addr, Timeout(time.Millisecond*100))
if assert.Error(t, err) {
return
}
Expand All @@ -139,7 +182,7 @@ func TestClientNoBanner(t *testing.T) {
assert.NoError(t, s.Close())
}()

c, err := NewClient(s.Addr, Timeout(time.Second))
c, err := NewClient(s.Addr, Timeout(time.Millisecond*100))
if assert.Error(t, err) {
return
}
Expand Down
10 changes: 9 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ import (
)

var (
// ErrInvalidConnectHeader is returned by NewClient if the server doesn't respond with the required connection header.
// ErrInvalidConnectHeader is returned by NewClient if the server
// doesn't respond with the required connection header.
ErrInvalidConnectHeader = errors.New("invalid connect header")

// ErrNilOption is returned by NewClient if an option is nil.
ErrNilOption = errors.New("nil option")

// ErrNotConnected is returned by Exec and ExecCmd if the client is not connected.
ErrNotConnected = errors.New("not connected")

// ErrTimeout is returned by Exec and ExecCmd if no response is received
// within the specified timeout duration.
ErrTimeout = errors.New("timeout")
)

// Error represents a error returned from the TeamSpeak 3 server.
Expand Down
4 changes: 3 additions & 1 deletion helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ func Decode(str string) string {

// DecodeResponse decodes a response into a struct.
func DecodeResponse(lines []string, v interface{}) error {
if len(lines) != 1 {
if len(lines) > 1 {
return NewInvalidResponseError("too many lines", lines)
} else if len(lines) == 0 {
return NewInvalidResponseError("no lines", lines)
}

input := make(map[string]interface{})
Expand Down
Loading