Skip to content

Commit

Permalink
Use atomic ints to set the time for lastSent and lastReceived, and pi…
Browse files Browse the repository at this point in the history
…ngOutstanding.

This resolves the issue of data races as previously coded, it changes the internal struct for the options but leaves the Options setting API the same.
  • Loading branch information
alsm committed Aug 17, 2017
1 parent e020008 commit d896be2
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 27 deletions.
16 changes: 8 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ type client struct {
stop chan struct{}
persist Store
options ClientOptions
lastSent time.Time
lastReceived time.Time
pingOutstanding bool
lastSent int64
lastReceived int64
pingOutstanding int32
status uint32
workers sync.WaitGroup
}
Expand Down Expand Up @@ -239,8 +239,8 @@ func (c *client) Connect() Token {
c.stop = make(chan struct{})

if c.options.KeepAlive != 0 {
c.lastReceived = time.Now()
c.lastSent = time.Now()
atomic.StoreInt64(&c.lastReceived, time.Now().Unix())
atomic.StoreInt64(&c.lastSent, time.Now().Unix())
c.workers.Add(1)
go keepalive(c)
}
Expand Down Expand Up @@ -342,9 +342,9 @@ func (c *client) reconnect() {
}

if c.options.KeepAlive != 0 {
c.pingOutstanding = false
c.lastReceived = time.Now()
c.lastSent = time.Now()
atomic.StoreInt32(&c.pingOutstanding, 0)
atomic.StoreInt64(&c.lastReceived, time.Now().Unix())
atomic.StoreInt64(&c.lastSent, time.Now().Unix())
c.workers.Add(1)
go keepalive(c)
}
Expand Down
2 changes: 1 addition & 1 deletion message.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func newConnectMsgFromOptions(options *ClientOptions) *packets.ConnectPacket {
}
}

m.Keepalive = uint16(options.KeepAlive.Seconds())
m.Keepalive = uint16(options.KeepAlive)

return m
}
7 changes: 4 additions & 3 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/url"
"os"
"reflect"
"sync/atomic"
"time"

"github.com/eclipse/paho.mqtt.golang/packets"
Expand Down Expand Up @@ -125,7 +126,7 @@ func incoming(c *client) {
case c.ibound <- cp:
// Notify keepalive logic that we recently received a packet
if c.options.KeepAlive != 0 {
c.lastReceived = time.Now()
atomic.StoreInt64(&c.lastReceived, time.Now().Unix())
}
case <-c.stop:
// This avoids a deadlock should a message arrive while shutting down.
Expand Down Expand Up @@ -205,7 +206,7 @@ func outgoing(c *client) {
}
// Reset ping timer after sending control packet.
if c.options.KeepAlive != 0 {
c.lastSent = time.Now()
atomic.StoreInt64(&c.lastSent, time.Now().Unix())
}
}
}
Expand All @@ -228,7 +229,7 @@ func alllogic(c *client) {
switch m := msg.(type) {
case *packets.PingrespPacket:
DEBUG.Println(NET, "received pingresp")
c.pingOutstanding = false
atomic.StoreInt32(&c.pingOutstanding, 0)
case *packets.SubackPacket:
DEBUG.Println(NET, "received suback, id:", m.MessageID)
token := c.getToken(m.MessageID)
Expand Down
6 changes: 3 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type ClientOptions struct {
ProtocolVersion uint
protocolVersionExplicit bool
TLSConfig tls.Config
KeepAlive time.Duration
KeepAlive int64

This comment has been minimized.

Copy link
@timhughes

timhughes Nov 22, 2017

This breaks the examples

PingTimeout time.Duration
ConnectTimeout time.Duration
MaxReconnectInterval time.Duration
Expand Down Expand Up @@ -90,7 +90,7 @@ func NewClientOptions() *ClientOptions {
ProtocolVersion: 0,
protocolVersionExplicit: false,
TLSConfig: tls.Config{},
KeepAlive: 30 * time.Second,
KeepAlive: 30,
PingTimeout: 10 * time.Second,
ConnectTimeout: 30 * time.Second,
MaxReconnectInterval: 10 * time.Minute,
Expand Down Expand Up @@ -182,7 +182,7 @@ func (o *ClientOptions) SetStore(s Store) *ClientOptions {
// allow the client to know that a connection has not been lost with the
// server.
func (o *ClientOptions) SetKeepAlive(k time.Duration) *ClientOptions {
o.KeepAlive = k
o.KeepAlive = int64(k / time.Second)
return o
}

Expand Down
2 changes: 1 addition & 1 deletion options_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (r *ClientOptionsReader) TLSConfig() tls.Config {
}

func (r *ClientOptionsReader) KeepAlive() time.Duration {
s := r.options.KeepAlive
s := time.Duration(r.options.KeepAlive * int64(time.Second))
return s
}

Expand Down
20 changes: 11 additions & 9 deletions ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mqtt

import (
"errors"
"sync/atomic"
"time"

"github.com/eclipse/paho.mqtt.golang/packets"
Expand All @@ -24,16 +25,16 @@ import (
func keepalive(c *client) {
defer c.workers.Done()
DEBUG.Println(PNG, "keepalive starting")
var checkInterval time.Duration
var checkInterval int64
var pingSent time.Time

if c.options.KeepAlive > 10*time.Second {
checkInterval = 5 * time.Second
if c.options.KeepAlive > 10 {
checkInterval = 5
} else {
checkInterval = c.options.KeepAlive / 2
}

intervalTicker := time.NewTicker(checkInterval)
intervalTicker := time.NewTicker(time.Duration(checkInterval * int64(time.Second)))
defer intervalTicker.Stop()

for {
Expand All @@ -42,19 +43,20 @@ func keepalive(c *client) {
DEBUG.Println(PNG, "keepalive stopped")
return
case <-intervalTicker.C:
if time.Now().Sub(c.lastSent) >= c.options.KeepAlive || time.Now().Sub(c.lastReceived) >= c.options.KeepAlive {
if !c.pingOutstanding {
DEBUG.Println(PNG, "ping check", time.Now().Unix()-atomic.LoadInt64(&c.lastSent))
if time.Now().Unix()-atomic.LoadInt64(&c.lastSent) >= c.options.KeepAlive || time.Now().Unix()-atomic.LoadInt64(&c.lastReceived) >= c.options.KeepAlive {
if atomic.LoadInt32(&c.pingOutstanding) == 0 {
DEBUG.Println(PNG, "keepalive sending ping")
ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
//We don't want to wait behind large messages being sent, the Write call
//will block until it it able to send the packet.
c.pingOutstanding = true
atomic.StoreInt32(&c.pingOutstanding, 1)
ping.Write(c.conn)
c.lastSent = time.Now()
atomic.StoreInt64(&c.lastSent, time.Now().Unix())
pingSent = time.Now()
}
}
if c.pingOutstanding && time.Now().Sub(pingSent) >= c.options.PingTimeout {
if atomic.LoadInt32(&c.pingOutstanding) > 0 && time.Now().Sub(pingSent) >= c.options.PingTimeout {
CRITICAL.Println(PNG, "pingresp not received, disconnecting")
c.errors <- errors.New("pingresp not received, disconnecting")
return
Expand Down
4 changes: 2 additions & 2 deletions unit_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func Test_NewClientOptions_default(t *testing.T) {
t.Fatalf("bad default password")
}

if o.KeepAlive != 30*time.Second {
if o.KeepAlive != 30 {
t.Fatalf("bad default timeout")
}
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func Test_NewClientOptions_mix(t *testing.T) {
t.Fatalf("bad set password")
}

if o.KeepAlive != 88000000000 {
if o.KeepAlive != 88 {
t.Fatalf("bad set timeout: %d", o.KeepAlive)
}
}
Expand Down

1 comment on commit d896be2

@maruel
Copy link

@maruel maruel commented on d896be2 Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At home when running this commit, this change causes a crash while another goroutine is running token.Wait():

sync/atomic.storeUint64(0x10a241d4, 0x5995d4ba, 0x0)
	/usr/local/go/src/sync/atomic/64bit_arm.go:20 +0x3c
github.com/eclipse/paho%2emqtt%2egolang.(*client).Connect.func1(0x10a24000, 0x1096c7f8, 0x1096a8a0)
	.../go/src/github.com/eclipse/paho.mqtt.golang/client.go:242 +0x920
created by github.com/eclipse/paho%2emqtt%2egolang.(*client).Connect
	.../go/src/github.com/eclipse/paho.mqtt.golang/client.go:275 +0x124

Please sign in to comment.