Skip to content

Commit

Permalink
Call pending callbacks of connection after run loop stopped (apache#239)
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed May 13, 2020
1 parent e7f1673 commit 234d7fa
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type connection struct {
incomingRequestsCh chan *request
incomingCmdCh chan *incomingCmd
closeCh chan interface{}
runLoopStoppedCh chan interface{}
writeRequestsCh chan []byte

pendingReqs map[uint64]*request
Expand Down Expand Up @@ -176,6 +177,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO
auth: auth,

closeCh: make(chan interface{}),
runLoopStoppedCh: make(chan interface{}),
incomingRequestsCh: make(chan *request, 10),
incomingCmdCh: make(chan *incomingCmd, 10),
writeRequestsCh: make(chan []byte, 10),
Expand Down Expand Up @@ -312,6 +314,7 @@ func (c *connection) run() {
for {
select {
case <-c.closeCh:
close(c.runLoopStoppedCh)
c.Close()
return

Expand Down Expand Up @@ -656,18 +659,18 @@ func (c *connection) Close() {

c.log.Info("Connection closed")
c.state = connectionClosed
if c.cnx != nil {
c.cnx.Close()
}
c.TriggerClose()
c.pingTicker.Stop()
c.pingCheckTicker.Stop()

for _, listener := range c.listeners {
listener.ConnectionClosed()
}

for _, req := range c.pendingReqs {
<-c.runLoopStoppedCh
for id, req := range c.pendingReqs {
req.callback(nil, errors.New("connection closed"))
delete(c.pendingReqs, id)
}

consumerHandlers := make(map[uint64]ConsumerHandler)
Expand Down

0 comments on commit 234d7fa

Please sign in to comment.