@@ -41,6 +41,7 @@ func newChain(
41
41
support consensus.ConsenterSupport ,
42
42
lastOffsetPersisted int64 ,
43
43
lastOriginalOffsetProcessed int64 ,
44
+ lastResubmittedConfigOffset int64 ,
44
45
) (* chainImpl , error ) {
45
46
lastCutBlockNumber := getLastCutBlockNumber (support .Height ())
46
47
logger .Infof ("[channel: %s] Starting chain with last persisted offset %d and last recorded block %d" ,
@@ -49,17 +50,32 @@ func newChain(
49
50
errorChan := make (chan struct {})
50
51
close (errorChan ) // We need this closed when starting up
51
52
53
+ doneReprocessingMsgInFlight := make (chan struct {})
54
+ // In either one of following cases, we should unblock ingress messages:
55
+ // - lastResubmittedConfigOffset == 0, where we've never resubmitted any config messages
56
+ // - lastResubmittedConfigOffset == lastOriginalOffsetProcessed, where the latest config message we resubmitted
57
+ // has been processed already
58
+ // - lastResubmittedConfigOffset < lastOriginalOffsetProcessed, where we've processed one or more resubmitted
59
+ // normal messages after the latest resubmitted config message. (we advance `lastResubmittedConfigOffset` for
60
+ // config messages, but not normal messages)
61
+ if lastResubmittedConfigOffset == 0 || lastResubmittedConfigOffset <= lastOriginalOffsetProcessed {
62
+ // If we've already caught up with the reprocessing resubmitted messages, close the channel to unblock broadcast
63
+ close (doneReprocessingMsgInFlight )
64
+ }
65
+
52
66
return & chainImpl {
53
67
consenter : consenter ,
54
68
ConsenterSupport : support ,
55
69
channel : newChannel (support .ChainID (), defaultPartition ),
56
70
lastOffsetPersisted : lastOffsetPersisted ,
57
71
lastOriginalOffsetProcessed : lastOriginalOffsetProcessed ,
72
+ lastResubmittedConfigOffset : lastResubmittedConfigOffset ,
58
73
lastCutBlockNumber : lastCutBlockNumber ,
59
74
60
- errorChan : errorChan ,
61
- haltChan : make (chan struct {}),
62
- startChan : make (chan struct {}),
75
+ errorChan : errorChan ,
76
+ haltChan : make (chan struct {}),
77
+ startChan : make (chan struct {}),
78
+ doneReprocessingMsgInFlight : doneReprocessingMsgInFlight ,
63
79
}, nil
64
80
}
65
81
@@ -70,12 +86,16 @@ type chainImpl struct {
70
86
channel channel
71
87
lastOffsetPersisted int64
72
88
lastOriginalOffsetProcessed int64
89
+ lastResubmittedConfigOffset int64
73
90
lastCutBlockNumber uint64
74
91
75
92
producer sarama.SyncProducer
76
93
parentConsumer sarama.Consumer
77
94
channelConsumer sarama.PartitionConsumer
78
95
96
+ // notification that there are in-flight messages need to wait for
97
+ doneReprocessingMsgInFlight chan struct {}
98
+
79
99
// When the partition consumer errors, close the channel. Otherwise, make
80
100
// this an open, unbuffered channel.
81
101
errorChan chan struct {}
@@ -135,6 +155,21 @@ func (chain *chainImpl) Halt() {
135
155
}
136
156
}
137
157
158
+ func (chain * chainImpl ) WaitReady () error {
159
+ select {
160
+ case <- chain .startChan : // The Start phase has completed
161
+ select {
162
+ case <- chain .haltChan : // The chain has been halted, stop here
163
+ return fmt .Errorf ("consenter for this channel has been halted" )
164
+ // Block waiting for all re-submitted messages to be reprocessed
165
+ case <- chain .doneReprocessingMsgInFlight :
166
+ return nil
167
+ }
168
+ default : // Not ready yet
169
+ return fmt .Errorf ("will not enqueue, consenter for this channel hasn't started yet" )
170
+ }
171
+ }
172
+
138
173
// Implements the consensus.Chain interface. Called by Broadcast().
139
174
func (chain * chainImpl ) Order (env * cb.Envelope , configSeq uint64 ) error {
140
175
return chain .order (env , configSeq , int64 (0 ))
@@ -430,17 +465,19 @@ func getLastCutBlockNumber(blockchainHeight uint64) uint64 {
430
465
return blockchainHeight - 1
431
466
}
432
467
433
- func getOffsets (metadataValue []byte , chainID string ) (persisted int64 , processed int64 ) {
468
+ func getOffsets (metadataValue []byte , chainID string ) (persisted int64 , processed int64 , resubmitted int64 ) {
434
469
if metadataValue != nil {
435
470
// Extract orderer-related metadata from the tip of the ledger first
436
471
kafkaMetadata := & ab.KafkaMetadata {}
437
472
if err := proto .Unmarshal (metadataValue , kafkaMetadata ); err != nil {
438
473
logger .Panicf ("[channel: %s] Ledger may be corrupted:" +
439
474
"cannot unmarshal orderer metadata in most recent block" , chainID )
440
475
}
441
- return kafkaMetadata .LastOffsetPersisted , kafkaMetadata .LastOriginalOffsetProcessed
476
+ return kafkaMetadata .LastOffsetPersisted ,
477
+ kafkaMetadata .LastOriginalOffsetProcessed ,
478
+ kafkaMetadata .LastResubmittedConfigOffset
442
479
}
443
- return sarama .OffsetOldest - 1 , int64 (0 ) // default
480
+ return sarama .OffsetOldest - 1 , int64 (0 ), int64 ( 0 ) // default
444
481
}
445
482
446
483
func newConnectMessage () * ab.KafkaMessage {
@@ -545,6 +582,7 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
545
582
metadata := utils .MarshalOrPanic (& ab.KafkaMetadata {
546
583
LastOffsetPersisted : offset ,
547
584
LastOriginalOffsetProcessed : chain .lastOriginalOffsetProcessed ,
585
+ LastResubmittedConfigOffset : chain .lastResubmittedConfigOffset ,
548
586
})
549
587
chain .WriteBlock (block , metadata )
550
588
chain .lastCutBlockNumber ++
@@ -559,6 +597,7 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
559
597
metadata := utils .MarshalOrPanic (& ab.KafkaMetadata {
560
598
LastOffsetPersisted : offset ,
561
599
LastOriginalOffsetProcessed : newOffset ,
600
+ LastResubmittedConfigOffset : chain .lastResubmittedConfigOffset ,
562
601
})
563
602
chain .WriteBlock (block , metadata )
564
603
chain .lastCutBlockNumber ++
@@ -583,6 +622,7 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
583
622
metadata := utils .MarshalOrPanic (& ab.KafkaMetadata {
584
623
LastOffsetPersisted : receivedOffset - 1 ,
585
624
LastOriginalOffsetProcessed : chain .lastOriginalOffsetProcessed ,
625
+ LastResubmittedConfigOffset : chain .lastResubmittedConfigOffset ,
586
626
})
587
627
chain .WriteBlock (block , metadata )
588
628
chain .lastCutBlockNumber ++
@@ -594,6 +634,7 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
594
634
metadata := utils .MarshalOrPanic (& ab.KafkaMetadata {
595
635
LastOffsetPersisted : receivedOffset ,
596
636
LastOriginalOffsetProcessed : chain .lastOriginalOffsetProcessed ,
637
+ LastResubmittedConfigOffset : chain .lastResubmittedConfigOffset ,
597
638
})
598
639
chain .WriteConfigBlock (block , metadata )
599
640
chain .lastCutBlockNumber ++
@@ -659,14 +700,21 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
659
700
case ab .KafkaMessageRegular_NORMAL :
660
701
// This is a message that is re-validated and re-ordered
661
702
if regularMessage .OriginalOffset != 0 {
703
+ logger .Debugf ("[channel: %s] Received re-submitted normal message with original offset %d" , chain .ChainID (), regularMessage .OriginalOffset )
704
+
662
705
// But we've reprocessed it already
663
706
if regularMessage .OriginalOffset <= chain .lastOriginalOffsetProcessed {
664
707
logger .Debugf (
665
- "[channel: %s] OriginalOffset(%d) <= LastOriginalOffsetProcessd (%d), message has been processed already, discard" ,
708
+ "[channel: %s] OriginalOffset(%d) <= LastOriginalOffsetProcessed (%d), message has been consumed already, discard" ,
666
709
chain .ChainID (), regularMessage .OriginalOffset , chain .lastOriginalOffsetProcessed )
667
710
return nil
668
711
}
669
712
713
+ logger .Debugf (
714
+ "[channel: %s] OriginalOffset(%d) > LastOriginalOffsetProcessed(%d), " +
715
+ "this is the first time we receive this re-submitted normal message" ,
716
+ chain .ChainID (), regularMessage .OriginalOffset , chain .lastOriginalOffsetProcessed )
717
+
670
718
// In case we haven't reprocessed the message, there's no need to differentiate it from those
671
719
// messages that will be processed for the first time.
672
720
}
@@ -704,15 +752,34 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
704
752
case ab .KafkaMessageRegular_CONFIG :
705
753
// This is a message that is re-validated and re-ordered
706
754
if regularMessage .OriginalOffset != 0 {
755
+ logger .Debugf ("[channel: %s] Received re-submitted config message with original offset %d" , chain .ChainID (), regularMessage .OriginalOffset )
756
+
707
757
// But we've reprocessed it already
708
758
if regularMessage .OriginalOffset <= chain .lastOriginalOffsetProcessed {
709
759
logger .Debugf (
710
- "[channel: %s] OriginalOffset(%d) <= LastOriginalOffsetProcessd (%d), message has been processed already, discard" ,
760
+ "[channel: %s] OriginalOffset(%d) <= LastOriginalOffsetProcessed (%d), message has been consumed already, discard" ,
711
761
chain .ChainID (), regularMessage .OriginalOffset , chain .lastOriginalOffsetProcessed )
712
762
return nil
763
+ }
764
+
765
+ logger .Debugf (
766
+ "[channel: %s] OriginalOffset(%d) > LastOriginalOffsetProcessed(%d), " +
767
+ "this is the first time we receive this re-submitted config message" ,
768
+ chain .ChainID (), regularMessage .OriginalOffset , chain .lastOriginalOffsetProcessed )
713
769
714
- // In case we haven't reprocessed the message, there's no need to differentiate it from those
715
- // messages that will be processed for the first time.
770
+ if regularMessage .OriginalOffset == chain .lastResubmittedConfigOffset && // This is very last resubmitted config message
771
+ regularMessage .ConfigSeq == seq { // AND we don't need to resubmit it again
772
+ logger .Debugf ("[channel: %s] Config message with original offset %d is the last in-flight resubmitted message" +
773
+ "and it does not require revalidation, unblock ingress messages now" , chain .ChainID (), regularMessage .OriginalOffset )
774
+ close (chain .doneReprocessingMsgInFlight ) // Therefore, we could finally close the channel to unblock broadcast
775
+ }
776
+
777
+ // Somebody resubmitted message at offset X, whereas we didn't. This is due to non-determinism where
778
+ // that message was considered invalid by us during revalidation, however somebody else deemed it to
779
+ // be valid, and resubmitted it. We need to advance lastResubmittedConfigOffset in this case in order
780
+ // to enforce consistency across the network.
781
+ if chain .lastResubmittedConfigOffset < regularMessage .OriginalOffset {
782
+ chain .lastResubmittedConfigOffset = regularMessage .OriginalOffset
716
783
}
717
784
}
718
785
@@ -730,6 +797,10 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r
730
797
return fmt .Errorf ("error re-submitting config message because = %s" , err )
731
798
}
732
799
800
+ logger .Debugf ("[channel: %s] Resubmitted config message with offset %d, block ingress messages" , chain .ChainID (), receivedOffset )
801
+ chain .lastResubmittedConfigOffset = receivedOffset // Keep track of last resubmitted message offset
802
+ chain .doneReprocessingMsgInFlight = make (chan struct {}) // Create the channel to block ingress messages
803
+
733
804
return nil
734
805
}
735
806
0 commit comments