Skip to content

Commit

Permalink
nsqd: fix stall in Exit() due to tcp-protocol producer connections
Browse files Browse the repository at this point in the history
Consumer connections are closed when topics are closed,
but tcp-protocol publish connections are not,
so add tcpServer.CloseAll().

problem introduced by #1190
  • Loading branch information
ploxiln committed Oct 15, 2019
1 parent ac1627b commit 1ba3db1
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
8 changes: 6 additions & 2 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type NSQD struct {

lookupPeers atomic.Value

tcpServer *tcpServer
tcpListener net.Listener
httpListener net.Listener
httpsListener net.Listener
Expand Down Expand Up @@ -255,9 +256,9 @@ func (n *NSQD) Main() error {
})
}

tcpServer := &tcpServer{ctx: ctx}
n.tcpServer = &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
})
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
Expand Down Expand Up @@ -423,6 +424,9 @@ func (n *NSQD) Exit() {
if n.tcpListener != nil {
n.tcpListener.Close()
}
if n.tcpServer != nil {
n.tcpServer.CloseAll()
}

if n.httpListener != nil {
n.httpListener.Close()
Expand Down
16 changes: 14 additions & 2 deletions nsqd/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package nsqd
import (
"io"
"net"
"sync"

"github.com/nsqio/nsq/internal/protocol"
)

type tcpServer struct {
ctx *context
ctx *context
conns sync.Map
}

func (p *tcpServer) Handle(clientConn net.Conn) {
Expand Down Expand Up @@ -41,9 +43,19 @@ func (p *tcpServer) Handle(clientConn net.Conn) {
return
}

p.conns.Store(clientConn.RemoteAddr(), clientConn)

err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
return
}

p.conns.Delete(clientConn.RemoteAddr())
}

func (p *tcpServer) CloseAll() {
p.conns.Range(func(k, v interface{}) bool {
v.(net.Conn).Close()
return true
})
}

0 comments on commit 1ba3db1

Please sign in to comment.