Skip to content

Commit 61f9368

Browse files
author
Luis Sanchez
committed
[FAB-5527] Failures in orderer/consensus/kafka
- cancel retry on Halt() - Halt() now waits for Start() to complete - cleanup channel resources after EnqueueError test Change-Id: Icec73a6eec9cc2fd1e9feb0aad49e82ce23ec589 Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
1 parent d788450 commit 61f9368

File tree

2 files changed

+26
-11
lines changed

2 files changed

+26
-11
lines changed

orderer/consensus/kafka/chain.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,24 @@ func (chain *chainImpl) Start() {
9999
// consensus.Chain interface.
100100
func (chain *chainImpl) Halt() {
101101
select {
102-
case <-chain.haltChan:
103-
// This construct is useful because it allows Halt() to be called
104-
// multiple times (by a single thread) w/o panicking. Recal that a
105-
// receive from a closed channel returns (the zero value) immediately.
106-
logger.Warningf("[channel: %s] Halting of chain requested again", chain.support.ChainID())
102+
case <-chain.startChan:
103+
// chain finished starting, so we can halt it
104+
select {
105+
case <-chain.haltChan:
106+
// This construct is useful because it allows Halt() to be called
107+
// multiple times (by a single thread) w/o panicking. Recal that a
108+
// receive from a closed channel returns (the zero value) immediately.
109+
logger.Warningf("[channel: %s] Halting of chain requested again", chain.support.ChainID())
110+
default:
111+
logger.Criticalf("[channel: %s] Halting of chain requested", chain.support.ChainID())
112+
close(chain.haltChan)
113+
chain.closeKafkaObjects() // Also close the producer and the consumer
114+
logger.Debugf("[channel: %s] Closed the haltChan", chain.support.ChainID())
115+
}
107116
default:
108-
logger.Criticalf("[channel: %s] Halting of chain requested", chain.support.ChainID())
109-
close(chain.haltChan)
110-
chain.closeKafkaObjects() // Also close the producer and the consumer
111-
logger.Debugf("[channel: %s] Closed the haltChan", chain.support.ChainID())
117+
logger.Warningf("[channel: %s] Waiting for chain to finish starting before halting", chain.support.ChainID())
118+
<-chain.startChan
119+
chain.Halt()
112120
}
113121
}
114122

@@ -476,8 +484,14 @@ func sendConnectMessage(retryOptions localconfig.Retry, exitChan chan struct{},
476484

477485
retryMsg := "Attempting to post the CONNECT message..."
478486
postConnect := newRetryProcess(retryOptions, exitChan, channel, retryMsg, func() error {
479-
_, _, err := producer.SendMessage(message)
480-
return err
487+
select {
488+
case <-exitChan:
489+
logger.Debugf("[channel: %s] Consenter for channel exiting, aborting retry", channel)
490+
return nil
491+
default:
492+
_, _, err := producer.SendMessage(message)
493+
return err
494+
}
481495
})
482496

483497
return postConnect.retry()

orderer/consensus/kafka/chain_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ func TestChain(t *testing.T) {
338338
case <-time.After(shortTimeout):
339339
t.Fatal("startChan should have been closed by now")
340340
}
341+
defer chain.Halt()
341342

342343
// Now make it so that the next ProduceRequest is met with an error
343344
mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{

0 commit comments

Comments
 (0)