Skip to content

Commit 0c028df

Browse files
committed
[FAB-5969] Block ingress msg for reprocessed msg
If there are reprocessed messages in flight, we need to block ingress messages till all in-fight messages are consumed and config seq is caught up, so we don't risk repeatedly reprocess new messages when config seq keeps advancing. Change-Id: I4b684cb50f956259b88df1d0cf28d443d59067bf Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
1 parent 9359c72 commit 0c028df

File tree

10 files changed

+428
-85
lines changed

10 files changed

+428
-85
lines changed

orderer/common/broadcast/broadcast.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ type Consenter interface {
5656
// Configure accepts a reconfiguration or returns an error indicating the cause of failure
5757
// It ultimately passes through to the consensus.Chain interface
5858
Configure(config *cb.Envelope, configSeq uint64) error
59+
60+
// WaitReady blocks waiting for consenter to be ready for accepting new messages.
61+
// This is useful when consenter needs to temporarily block ingress messages so
62+
// that in-flight messages can be consumed. It could return error if consenter is
63+
// in erroneous states. If this blocking behavior is not desired, consenter could
64+
// simply return nil.
65+
WaitReady() error
5966
}
6067

6168
type handlerImpl struct {
@@ -90,6 +97,11 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
9097
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR, Info: err.Error()})
9198
}
9299

100+
if err = processor.WaitReady(); err != nil {
101+
logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
102+
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()})
103+
}
104+
93105
if !isConfig {
94106
logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])
95107

orderer/common/broadcast/broadcast_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ type mockSupport struct {
111111
rejectEnqueue bool
112112
}
113113

114+
func (ms *mockSupport) WaitReady() error {
115+
return nil
116+
}
117+
114118
// Order sends a message for ordering
115119
func (ms *mockSupport) Order(env *cb.Envelope, configSeq uint64) error {
116120
if ms.rejectEnqueue {

orderer/common/multichannel/util_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ func (mch *mockChain) Configure(config *cb.Envelope, configSeq uint64) error {
5454
return nil
5555
}
5656

57+
func (mch *mockChain) WaitReady() error {
58+
return nil
59+
}
60+
5761
func (mch *mockChain) Start() {
5862
go func() {
5963
defer close(mch.done)

orderer/consensus/consensus.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,15 @@ type Chain interface {
5151
// it is the responsibility of the consenter to recompute the resulting config,
5252
// discarding the message if the reconfiguration is no longer valid.
5353
// The consenter may return an error, indicating the message was not accepted
54-
//
55-
// TODO block Order/Configure calls while a configure message is in flight, see FAB-5969
5654
Configure(config *cb.Envelope, configSeq uint64) error
5755

56+
// WaitReady blocks waiting for consenter to be ready for accepting new messages.
57+
// This is useful when consenter needs to temporarily block ingress messages so
58+
// that in-flight messages can be consumed. It could return error if consenter is
59+
// in erroneous states. If this blocking behavior is not desired, consenter could
60+
// simply return nil.
61+
WaitReady() error
62+
5863
// Errored returns a channel which will close when an error has occurred.
5964
// This is especially useful for the Deliver client, who must terminate waiting
6065
// clients when the consenter is not up to date.

orderer/consensus/kafka/chain.go

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func newChain(
4141
support consensus.ConsenterSupport,
4242
lastOffsetPersisted int64,
4343
lastOriginalOffsetProcessed int64,
44+
lastResubmittedConfigOffset int64,
4445
) (*chainImpl, error) {
4546
lastCutBlockNumber := getLastCutBlockNumber(support.Height())
4647
logger.Infof("[channel: %s] Starting chain with last persisted offset %d and last recorded block %d",
@@ -49,17 +50,32 @@ func newChain(
4950
errorChan := make(chan struct{})
5051
close(errorChan) // We need this closed when starting up
5152

53+
doneReprocessingMsgInFlight := make(chan struct{})
54+
// In either one of following cases, we should unblock ingress messages:
55+
// - lastResubmittedConfigOffset == 0, where we've never resubmitted any config messages
56+
// - lastResubmittedConfigOffset == lastOriginalOffsetProcessed, where the latest config message we resubmitted
57+
// has been processed already
58+
// - lastResubmittedConfigOffset < lastOriginalOffsetProcessed, where we've processed one or more resubmitted
59+
// normal messages after the latest resubmitted config message. (we advance `lastResubmittedConfigOffset` for
60+
// config messages, but not normal messages)
61+
if lastResubmittedConfigOffset == 0 || lastResubmittedConfigOffset <= lastOriginalOffsetProcessed {
62+
// If we've already caught up with the reprocessing resubmitted messages, close the channel to unblock broadcast
63+
close(doneReprocessingMsgInFlight)
64+
}
65+
5266
return &chainImpl{
5367
consenter: consenter,
5468
ConsenterSupport: support,
5569
channel: newChannel(support.ChainID(), defaultPartition),
5670
lastOffsetPersisted: lastOffsetPersisted,
5771
lastOriginalOffsetProcessed: lastOriginalOffsetProcessed,
72+
lastResubmittedConfigOffset: lastResubmittedConfigOffset,
5873
lastCutBlockNumber: lastCutBlockNumber,
5974

60-
errorChan: errorChan,
61-
haltChan: make(chan struct{}),
62-
startChan: make(chan struct{}),
75+
errorChan: errorChan,
76+
haltChan: make(chan struct{}),
77+
startChan: make(chan struct{}),
78+
doneReprocessingMsgInFlight: doneReprocessingMsgInFlight,
6379
}, nil
6480
}
6581

@@ -70,12 +86,16 @@ type chainImpl struct {
7086
channel channel
7187
lastOffsetPersisted int64
7288
lastOriginalOffsetProcessed int64
89+
lastResubmittedConfigOffset int64
7390
lastCutBlockNumber uint64
7491

7592
producer sarama.SyncProducer
7693
parentConsumer sarama.Consumer
7794
channelConsumer sarama.PartitionConsumer
7895

96+
// notification that there are in-flight messages need to wait for
97+
doneReprocessingMsgInFlight chan struct{}
98+
7999
// When the partition consumer errors, close the channel. Otherwise, make
80100
// this an open, unbuffered channel.
81101
errorChan chan struct{}
@@ -135,6 +155,21 @@ func (chain *chainImpl) Halt() {
135155
}
136156
}
137157

158+
func (chain *chainImpl) WaitReady() error {
159+
select {
160+
case <-chain.startChan: // The Start phase has completed
161+
select {
162+
case <-chain.haltChan: // The chain has been halted, stop here
163+
return fmt.Errorf("consenter for this channel has been halted")
164+
// Block waiting for all re-submitted messages to be reprocessed
165+
case <-chain.doneReprocessingMsgInFlight:
166+
return nil
167+
}
168+
default: // Not ready yet
169+
return fmt.Errorf("will not enqueue, consenter for this channel hasn't started yet")
170+
}
171+
}
172+
138173
// Implements the consensus.Chain interface. Called by Broadcast().
139174
func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error {
140175
return chain.order(env, configSeq, int64(0))
@@ -430,17 +465,19 @@ func getLastCutBlockNumber(blockchainHeight uint64) uint64 {
430465
return blockchainHeight - 1
431466
}
432467

433-
func getOffsets(metadataValue []byte, chainID string) (persisted int64, processed int64) {
468+
func getOffsets(metadataValue []byte, chainID string) (persisted int64, processed int64, resubmitted int64) {
434469
if metadataValue != nil {
435470
// Extract orderer-related metadata from the tip of the ledger first
436471
kafkaMetadata := &ab.KafkaMetadata{}
437472
if err := proto.Unmarshal(metadataValue, kafkaMetadata); err != nil {
438473
logger.Panicf("[channel: %s] Ledger may be corrupted:"+
439474
"cannot unmarshal orderer metadata in most recent block", chainID)
440475
}
441-
return kafkaMetadata.LastOffsetPersisted, kafkaMetadata.LastOriginalOffsetProcessed
476+
return kafkaMetadata.LastOffsetPersisted,
477+
kafkaMetadata.LastOriginalOffsetProcessed,
478+
kafkaMetadata.LastResubmittedConfigOffset
442479
}
443-
return sarama.OffsetOldest - 1, int64(0) // default
480+
return sarama.OffsetOldest - 1, int64(0), int64(0) // default
444481
}
445482

446483
func newConnectMessage() *ab.KafkaMessage {
@@ -545,6 +582,7 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
545582
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
546583
LastOffsetPersisted: offset,
547584
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
585+
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
548586
})
549587
chain.WriteBlock(block, metadata)
550588
chain.lastCutBlockNumber++
@@ -559,6 +597,7 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
559597
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
560598
LastOffsetPersisted: offset,
561599
LastOriginalOffsetProcessed: newOffset,
600+
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
562601
})
563602
chain.WriteBlock(block, metadata)
564603
chain.lastCutBlockNumber++
@@ -583,6 +622,7 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
583622
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
584623
LastOffsetPersisted: receivedOffset - 1,
585624
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
625+
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
586626
})
587627
chain.WriteBlock(block, metadata)
588628
chain.lastCutBlockNumber++
@@ -594,6 +634,7 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
594634
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
595635
LastOffsetPersisted: receivedOffset,
596636
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
637+
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
597638
})
598639
chain.WriteConfigBlock(block, metadata)
599640
chain.lastCutBlockNumber++
@@ -659,14 +700,21 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
659700
case ab.KafkaMessageRegular_NORMAL:
660701
// This is a message that is re-validated and re-ordered
661702
if regularMessage.OriginalOffset != 0 {
703+
logger.Debugf("[channel: %s] Received re-submitted normal message with original offset %d", chain.ChainID(), regularMessage.OriginalOffset)
704+
662705
// But we've reprocessed it already
663706
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
664707
logger.Debugf(
665-
"[channel: %s] OriginalOffset(%d) <= LastOriginalOffsetProcessd(%d), message has been processed already, discard",
708+
"[channel: %s] OriginalOffset(%d) <= LastOriginalOffsetProcessed(%d), message has been consumed already, discard",
666709
chain.ChainID(), regularMessage.OriginalOffset, chain.lastOriginalOffsetProcessed)
667710
return nil
668711
}
669712

713+
logger.Debugf(
714+
"[channel: %s] OriginalOffset(%d) > LastOriginalOffsetProcessed(%d), "+
715+
"this is the first time we receive this re-submitted normal message",
716+
chain.ChainID(), regularMessage.OriginalOffset, chain.lastOriginalOffsetProcessed)
717+
670718
// In case we haven't reprocessed the message, there's no need to differentiate it from those
671719
// messages that will be processed for the first time.
672720
}
@@ -704,15 +752,34 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
704752
case ab.KafkaMessageRegular_CONFIG:
705753
// This is a message that is re-validated and re-ordered
706754
if regularMessage.OriginalOffset != 0 {
755+
logger.Debugf("[channel: %s] Received re-submitted config message with original offset %d", chain.ChainID(), regularMessage.OriginalOffset)
756+
707757
// But we've reprocessed it already
708758
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
709759
logger.Debugf(
710-
"[channel: %s] OriginalOffset(%d) <= LastOriginalOffsetProcessd(%d), message has been processed already, discard",
760+
"[channel: %s] OriginalOffset(%d) <= LastOriginalOffsetProcessed(%d), message has been consumed already, discard",
711761
chain.ChainID(), regularMessage.OriginalOffset, chain.lastOriginalOffsetProcessed)
712762
return nil
763+
}
764+
765+
logger.Debugf(
766+
"[channel: %s] OriginalOffset(%d) > LastOriginalOffsetProcessed(%d), "+
767+
"this is the first time we receive this re-submitted config message",
768+
chain.ChainID(), regularMessage.OriginalOffset, chain.lastOriginalOffsetProcessed)
713769

714-
// In case we haven't reprocessed the message, there's no need to differentiate it from those
715-
// messages that will be processed for the first time.
770+
if regularMessage.OriginalOffset == chain.lastResubmittedConfigOffset && // This is very last resubmitted config message
771+
regularMessage.ConfigSeq == seq { // AND we don't need to resubmit it again
772+
logger.Debugf("[channel: %s] Config message with original offset %d is the last in-flight resubmitted message"+
773+
"and it does not require revalidation, unblock ingress messages now", chain.ChainID(), regularMessage.OriginalOffset)
774+
close(chain.doneReprocessingMsgInFlight) // Therefore, we could finally close the channel to unblock broadcast
775+
}
776+
777+
// Somebody resubmitted message at offset X, whereas we didn't. This is due to non-determinism where
778+
// that message was considered invalid by us during revalidation, however somebody else deemed it to
779+
// be valid, and resubmitted it. We need to advance lastResubmittedConfigOffset in this case in order
780+
// to enforce consistency across the network.
781+
if chain.lastResubmittedConfigOffset < regularMessage.OriginalOffset {
782+
chain.lastResubmittedConfigOffset = regularMessage.OriginalOffset
716783
}
717784
}
718785

@@ -730,6 +797,10 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
730797
return fmt.Errorf("error re-submitting config message because = %s", err)
731798
}
732799

800+
logger.Debugf("[channel: %s] Resubmitted config message with offset %d, block ingress messages", chain.ChainID(), receivedOffset)
801+
chain.lastResubmittedConfigOffset = receivedOffset // Keep track of last resubmitted message offset
802+
chain.doneReprocessingMsgInFlight = make(chan struct{}) // Create the channel to block ingress messages
803+
733804
return nil
734805
}
735806

0 commit comments

Comments
 (0)