Skip to content

Commit d276593

Browse files
authored
ConnectionNotificationHandler - generic callback for all types of connection events.
Enables users to receive updates on connection related events (e.g. connecting, connection failed, broker connection failed etc).
2 parents 3c6d3cc + 8a350a9 commit d276593

File tree

4 files changed

+211
-78
lines changed

4 files changed

+211
-78
lines changed

client.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,12 +258,15 @@ func (c *client) Connect() Token {
258258
return
259259
}
260260

261+
var attemptCount int
262+
261263
RETRYCONN:
262264
var conn net.Conn
263265
var rc byte
264266
var err error
265-
conn, rc, t.sessionPresent, err = c.attemptConnection()
267+
conn, rc, t.sessionPresent, err = c.attemptConnection(false, attemptCount)
266268
if err != nil {
269+
attemptCount++
267270
if c.options.ConnectRetry {
268271
DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry, error:", err.Error())
269272
time.Sleep(c.options.ConnectRetryInterval)
@@ -315,15 +318,17 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
315318
DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds")
316319
}
317320

321+
var attemptCount int
318322
for {
319323
if nil != c.options.OnReconnecting {
320324
c.options.OnReconnecting(c, &c.options)
321325
}
322326
var err error
323-
conn, _, _, err = c.attemptConnection()
327+
conn, _, _, err = c.attemptConnection(true, attemptCount)
324328
if err == nil {
325329
break
326330
}
331+
attemptCount++
327332
sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false)
328333
DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err)
329334

@@ -351,7 +356,7 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
351356
// byte - Return code (packets.Accepted indicates a successful connection).
352357
// bool - SessionPresent flag from the connect ack (only valid if packets.Accepted)
353358
// err - Error (err != nil guarantees that conn has been set to active connection).
354-
func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
359+
func (c *client) attemptConnection(isReconnect bool, attempt int) (net.Conn, byte, bool, error) {
355360
protocolVersion := c.options.ProtocolVersion
356361
var (
357362
sessionPresent bool
@@ -360,6 +365,10 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
360365
rc byte
361366
)
362367

368+
if c.options.OnConnectionNotification != nil {
369+
c.options.OnConnectionNotification(c, ConnectionNotificationConnecting{isReconnect, attempt})
370+
}
371+
363372
c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
364373
brokers := c.options.Servers
365374
c.optionsMu.Unlock()
@@ -372,6 +381,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
372381
DEBUG.Println(CLI, "using custom onConnectAttempt handler...")
373382
tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
374383
}
384+
if c.options.OnConnectionNotification != nil {
385+
c.options.OnConnectionNotification(c, ConnectionNotificationBroker{broker})
386+
}
375387
connDeadline := time.Now().Add(c.options.ConnectTimeout) // Time by which connection must be established
376388
dialer := c.options.Dialer
377389
if dialer == nil { //
@@ -388,6 +400,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
388400
ERROR.Println(CLI, err.Error())
389401
WARN.Println(CLI, "failed to connect to broker, trying next")
390402
rc = packets.ErrNetworkError
403+
if c.options.OnConnectionNotification != nil {
404+
c.options.OnConnectionNotification(c, ConnectionNotificationBrokerFailed{broker, err})
405+
}
391406
continue
392407
}
393408
DEBUG.Println(CLI, "socket connected to broker")
@@ -430,6 +445,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
430445
err = fmt.Errorf("%w : %w", packets.ConnErrors[rc], err)
431446
}
432447
}
448+
if err != nil && c.options.OnConnectionNotification != nil {
449+
c.options.OnConnectionNotification(c, ConnectionNotificationFailed{err})
450+
}
433451
return conn, rc, sessionPresent, err
434452
}
435453

@@ -564,6 +582,9 @@ func (c *client) internalConnLost(whyConnLost error) {
564582
if c.options.OnConnectionLost != nil {
565583
go c.options.OnConnectionLost(c, whyConnLost)
566584
}
585+
if c.options.OnConnectionNotification != nil {
586+
go c.options.OnConnectionNotification(c, ConnectionNotificationLost{whyConnLost})
587+
}
567588
DEBUG.Println(CLI, "internalConnLost complete")
568589
}()
569590
}
@@ -613,6 +634,9 @@ func (c *client) startCommsWorkers(conn net.Conn, connectionUp connCompletedFn,
613634
if c.options.OnConnect != nil {
614635
go c.options.OnConnect(c)
615636
}
637+
if c.options.OnConnectionNotification != nil {
638+
go c.options.OnConnectionNotification(c, ConnectionNotificationConnected{})
639+
}
616640

617641
// c.oboundP and c.obound need to stay active for the life of the client because, depending upon the options,
618642
// messages may be published while the client is disconnected (they will block unless in a goroutine). However

cmd/simple/main.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,22 @@ func main() {
3939
opts.SetKeepAlive(2 * time.Second)
4040
opts.SetDefaultPublishHandler(f)
4141
opts.SetPingTimeout(1 * time.Second)
42+
opts.SetConnectionNotificationHandler(func(client mqtt.Client, notification mqtt.ConnectionNotification) {
43+
switch n := notification.(type) {
44+
case mqtt.ConnectionNotificationConnected:
45+
fmt.Printf("[NOTIFICATION] connected\n")
46+
case mqtt.ConnectionNotificationConnecting:
47+
fmt.Printf("[NOTIFICATION] connecting (isReconnect=%t) [%d]\n", n.IsReconnect, n.Attempt)
48+
case mqtt.ConnectionNotificationFailed:
49+
fmt.Printf("[NOTIFICATION] connection failed: %v\n", n.Reason)
50+
case mqtt.ConnectionNotificationLost:
51+
fmt.Printf("[NOTIFICATION] connection lost: %v\n", n.Reason)
52+
case mqtt.ConnectionNotificationBroker:
53+
fmt.Printf("[NOTIFICATION] broker connection: %s\n", n.Broker.String())
54+
case mqtt.ConnectionNotificationBrokerFailed:
55+
fmt.Printf("[NOTIFICATION] broker connection failed: %v [%s]\n", n.Reason, n.Broker.String())
56+
}
57+
})
4258

4359
c := mqtt.NewClient(opts)
4460
if token := c.Connect(); token.Wait() && token.Error() != nil {

connnotf.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package mqtt
2+
3+
import "net/url"
4+
5+
type ConnectionNotificationType int64
6+
7+
const (
8+
ConnectionNotificationTypeConnected ConnectionNotificationType = iota
9+
ConnectionNotificationTypeConnecting
10+
ConnectionNotificationTypeFailed
11+
ConnectionNotificationTypeLost
12+
ConnectionNotificationTypeBroker
13+
ConnectionNotificationTypeBrokerFailed
14+
)
15+
16+
type ConnectionNotification interface {
17+
Type() ConnectionNotificationType
18+
}
19+
20+
// Connected
21+
22+
type ConnectionNotificationConnected struct {
23+
}
24+
25+
func (n ConnectionNotificationConnected) Type() ConnectionNotificationType {
26+
return ConnectionNotificationTypeConnected
27+
}
28+
29+
// Connecting
30+
31+
type ConnectionNotificationConnecting struct {
32+
IsReconnect bool
33+
Attempt int
34+
}
35+
36+
func (n ConnectionNotificationConnecting) Type() ConnectionNotificationType {
37+
return ConnectionNotificationTypeConnecting
38+
}
39+
40+
// Connection Failed
41+
42+
type ConnectionNotificationFailed struct {
43+
Reason error
44+
}
45+
46+
func (n ConnectionNotificationFailed) Type() ConnectionNotificationType {
47+
return ConnectionNotificationTypeFailed
48+
}
49+
50+
// Connection Lost
51+
52+
type ConnectionNotificationLost struct {
53+
Reason error // may be nil
54+
}
55+
56+
func (n ConnectionNotificationLost) Type() ConnectionNotificationType {
57+
return ConnectionNotificationTypeLost
58+
}
59+
60+
// Broker Connection
61+
62+
type ConnectionNotificationBroker struct {
63+
Broker *url.URL
64+
}
65+
66+
func (n ConnectionNotificationBroker) Type() ConnectionNotificationType {
67+
return ConnectionNotificationTypeBroker
68+
}
69+
70+
// Broker Connection Failed
71+
72+
type ConnectionNotificationBrokerFailed struct {
73+
Broker *url.URL
74+
Reason error
75+
}
76+
77+
func (n ConnectionNotificationBrokerFailed) Type() ConnectionNotificationType {
78+
return ConnectionNotificationTypeBrokerFailed
79+
}

0 commit comments

Comments
 (0)