From ea3e0540dbb59596b6e9cf4b6dbced8d5f9b0d11 Mon Sep 17 00:00:00 2001 From: xiaolong ran Date: Wed, 30 Jun 2021 20:02:45 +0800 Subject: [PATCH] Fix channel data race (#558) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: xiaolongran ### Motivation In `internalSendRequest`, We will add the request to be sent to the `pendingReqs` map, even when the current connection status is `connectionClosed`, we will append the request, which will cause the current request's callback to be called twice First: ``` func (c *connection) internalSendRequest(req *request) { c.pendingLock.Lock() if req.id != nil { c.pendingReqs[*req.id] = req } c.pendingLock.Unlock() if c.getState() == connectionClosed { c.log.Warnf("internalSendRequest failed for connectionClosed") // In Here, call req.callback ************* if req.callback != nil { req.callback(req.cmd, ErrConnectionClosed) } } else { c.writeCommand(req.cmd) } } ``` Twice: ``` func (c *connection) run() { // All reads come from the reader goroutine go c.reader.readFromConnection() go c.runPingCheck() c.log.Debugf("Connection run starting with request capacity=%d queued=%d", cap(c.incomingRequestsCh), len(c.incomingRequestsCh)) defer func() { // all the accesses to the pendingReqs should be happened in this run loop thread, // including the final cleanup, to avoid the issue https://github.com/apache/pulsar-client-go/issues/239 c.pendingLock.Lock() for id, req := range c.pendingReqs { // In Here, call req.callback ********** req.callback(nil, errors.New("connection closed")) delete(c.pendingReqs, id) } c.pendingLock.Unlock() c.Close() }() .... } ``` In fact, when the current connection is in the `connectionClosed` state, we don’t need to append the request to the `pendingReqs` map, so we don’t need to process the request when it’s closed. ### Modifications When the connection is closed, the current request to be sent is not added to the `pendingReqs` map. --- pulsar/internal/connection.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 15dfceca81..1e3ce0264b 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -584,17 +584,17 @@ func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error { } func (c *connection) internalSendRequest(req *request) { - c.pendingLock.Lock() - if req.id != nil { - c.pendingReqs[*req.id] = req - } - c.pendingLock.Unlock() if c.getState() == connectionClosed { c.log.Warnf("internalSendRequest failed for connectionClosed") if req.callback != nil { req.callback(req.cmd, ErrConnectionClosed) } } else { + c.pendingLock.Lock() + if req.id != nil { + c.pendingReqs[*req.id] = req + } + c.pendingLock.Unlock() c.writeCommand(req.cmd) } }