Skip to content

Commit 35ee03d

Browse files
authored
Potential panic when using manual ACK
Manual acknowledgments can occur at any time. matchAndDispatch was not allowing for this fully, when the connection was lost/terminated a panic was possible where an Ack was sent after the matchAndDispatch goroutine terminated. This fix simplifies the handling of Ack packets and should resolve the issue (test included). Note that this is not a complete solution (ignoring an ACK after the connection has dropped will not always be the right option). ref #726
2 parents 601453b + 433bd22 commit 35ee03d

File tree

3 files changed

+101
-56
lines changed

3 files changed

+101
-56
lines changed

fvt_client_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,3 +1711,77 @@ func Test_OverLengthTopic(t *testing.T) {
17111711
p.Disconnect(250)
17121712
s.Disconnect(250)
17131713
}
1714+
1715+
// Test_Ack_After_Disconnect issue #726
1716+
// Must not panic if Ack is sent after connection loss
1717+
func Test_Ack_After_Disconnect(t *testing.T) {
1718+
pops := NewClientOptions()
1719+
pops.AddBroker(FVTTCP)
1720+
pops.SetClientID("Ack_After_Disconnect_tx")
1721+
p := NewClient(pops)
1722+
1723+
msgReceived := make(chan struct{})
1724+
disconnectDone := make(chan struct{})
1725+
ackCalled := make(chan bool)
1726+
1727+
sops := NewClientOptions()
1728+
sops.AddBroker(FVTTCP)
1729+
sops.AutoAckDisabled = true // Manual message Acknowledgment (so we can delay this)
1730+
sops.SetClientID("Ack_After_Disconnect_rx")
1731+
var f MessageHandler = func(client Client, msg Message) {
1732+
// matchAndDispatch waits for handlers to complete, so we return after
1733+
// starting a goroutine that will call Ack eventually
1734+
close(msgReceived)
1735+
go func() {
1736+
defer close(ackCalled) // should only ever get one message
1737+
1738+
select {
1739+
case <-disconnectDone:
1740+
msg.Ack()
1741+
ackCalled <- true
1742+
case <-time.After(time.Second):
1743+
ackCalled <- false
1744+
}
1745+
}()
1746+
}
1747+
sops.SetDefaultPublishHandler(f)
1748+
s := NewClient(sops)
1749+
1750+
sToken := s.Connect()
1751+
if sToken.Wait() && sToken.Error() != nil {
1752+
t.Fatalf("Error on Client.Connect(): %v", sToken.Error())
1753+
}
1754+
1755+
s.Subscribe("/test/ack-after-disconnect", 2, nil)
1756+
1757+
pToken := p.Connect()
1758+
if pToken.Wait() && pToken.Error() != nil {
1759+
t.Fatalf("Error on Client.Connect(): %v", pToken.Error())
1760+
}
1761+
1762+
p.Publish("/test/ack-after-disconnect", 2, false, "Publish qos0")
1763+
1764+
select {
1765+
case <-msgReceived:
1766+
case <-time.After(time.Second):
1767+
t.Error("Timed out waiting for message to be received")
1768+
}
1769+
1770+
p.Disconnect(0)
1771+
s.Disconnect(0)
1772+
1773+
// Ensure disconnection complete before proceeding
1774+
for s.(*client).status.ConnectionStatus() != disconnected {
1775+
time.Sleep(10 * time.Millisecond)
1776+
}
1777+
close(disconnectDone)
1778+
1779+
select {
1780+
case c := <-ackCalled:
1781+
if !c {
1782+
t.Error("Did not receive message, so no attempt made to send Ack")
1783+
}
1784+
case <-time.After(time.Second):
1785+
t.Error("Timed out waiting for ackCalled")
1786+
}
1787+
}

net.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -444,24 +444,23 @@ func startComms(conn net.Conn, // Network connection (must be active)
444444
}
445445

446446
// ackFunc acknowledges a packet
447-
// WARNING the function returned must not be called if the comms routine is shutting down or not running
448-
// (it needs outgoing comms in order to send the acknowledgement). Currently this is only called from
449-
// matchAndDispatch which will be shutdown before the comms are
450-
func ackFunc(oboundP chan *PacketAndToken, persist Store, packet *packets.PublishPacket) func() {
447+
// WARNING sendAck may be called at any time (even after the connection is dead). At the time of writing ACK sent after
448+
// connection loss will be dropped (this is not ideal)
449+
func ackFunc(sendAck func(*PacketAndToken), persist Store, packet *packets.PublishPacket) func() {
451450
return func() {
452451
switch packet.Qos {
453452
case 2:
454453
pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
455454
pr.MessageID = packet.MessageID
456455
DEBUG.Println(NET, "putting pubrec msg on obound")
457-
oboundP <- &PacketAndToken{p: pr, t: nil}
456+
sendAck(&PacketAndToken{p: pr, t: nil})
458457
DEBUG.Println(NET, "done putting pubrec msg on obound")
459458
case 1:
460459
pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
461460
pa.MessageID = packet.MessageID
462461
DEBUG.Println(NET, "putting puback msg on obound")
463-
persistOutbound(persist, pa)
464-
oboundP <- &PacketAndToken{p: pa, t: nil}
462+
persistOutbound(persist, pa) // May fail if store has been closed
463+
sendAck(&PacketAndToken{p: pa, t: nil})
465464
DEBUG.Println(NET, "done putting puback msg on obound")
466465
case 0:
467466
// do nothing, since there is no need to send an ack packet back

router.go

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -136,38 +136,21 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
136136
// associated callback (or the defaultHandler, if one exists and no other route matched). If
137137
// anything is sent down the stop channel the function will end.
138138
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
139-
var wg sync.WaitGroup
140-
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
141-
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel
142-
143-
stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
144-
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
145-
goRoutinesDone := make(chan struct{}) // closed on wg.Done()
146-
if order {
147-
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
148-
} else {
149-
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
150-
ackInChan = make(chan *PacketAndToken)
151-
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
152-
for {
153-
select {
154-
case a := <-ackInChan:
155-
ackOutChan <- a
156-
case <-stopAckCopy:
157-
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
158-
for {
159-
select {
160-
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
161-
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
162-
case <-goRoutinesDone:
163-
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
164-
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
165-
return
166-
}
167-
}
168-
}
169-
}
170-
}()
139+
ackChan := make(chan *PacketAndToken) // Channel returned to caller; closed when goroutine terminates
140+
141+
// In some cases message acknowledgments may come through after shutdown (connection is down etc). Where this is the
142+
// case we need to accept any such requests and then ignore them. Note that this is not a perfect solution, if we
143+
// have reconnected, and the session is still live, then the Ack really should be sent (see Issus #726)
144+
var ackMutex sync.RWMutex
145+
sendAckChan := ackChan // This will be set to nil before ackChan is closed
146+
sendAck := func(ack *PacketAndToken) {
147+
ackMutex.RLock()
148+
defer ackMutex.RUnlock()
149+
if sendAckChan != nil {
150+
sendAckChan <- ack
151+
} else {
152+
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
153+
}
171154
}
172155

173156
go func() { // Main go routine handling inbound messages
@@ -176,20 +159,18 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
176159
// DEBUG.Println(ROU, "matchAndDispatch received message")
177160
sent := false
178161
r.RLock()
179-
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
162+
m := messageFromPublish(message, ackFunc(sendAck, client.persist, message))
180163
for e := r.routes.Front(); e != nil; e = e.Next() {
181164
if e.Value.(*route).match(message.TopicName) {
182165
if order {
183166
handlers = append(handlers, e.Value.(*route).callback)
184167
} else {
185168
hd := e.Value.(*route).callback
186-
wg.Add(1)
187169
go func() {
188170
hd(client, m)
189171
if !client.options.AutoAckDisabled {
190172
m.Ack()
191173
}
192-
wg.Done()
193174
}()
194175
}
195176
sent = true
@@ -200,13 +181,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
200181
if order {
201182
handlers = append(handlers, r.defaultHandler)
202183
} else {
203-
wg.Add(1)
204184
go func() {
205185
r.defaultHandler(client, m)
206186
if !client.options.AutoAckDisabled {
207187
m.Ack()
208188
}
209-
wg.Done()
210189
}()
211190
}
212191
} else {
@@ -225,18 +204,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
225204
}
226205
// DEBUG.Println(ROU, "matchAndDispatch handled message")
227206
}
228-
if order {
229-
close(ackOutChan)
230-
} else { // Ensure that nothing further will be written to ackOutChan before closing it
231-
close(stopAckCopy)
232-
<-ackCopyStopped
233-
close(ackOutChan)
234-
go func() {
235-
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
236-
close(goRoutinesDone)
237-
}()
238-
}
207+
ackMutex.Lock()
208+
sendAckChan = nil
209+
ackMutex.Unlock()
210+
close(ackChan) // as sendAckChan is now nil nothing further will be sent on this
239211
DEBUG.Println(ROU, "matchAndDispatch exiting")
240212
}()
241-
return ackOutChan
213+
return ackChan
242214
}

0 commit comments

Comments
 (0)