@@ -111,8 +111,8 @@ type Chain struct {
111
111
startC chan struct {} // Closes when the node is started
112
112
snapC chan * raftpb.Snapshot // Signal to catch up with snapshot
113
113
114
- configChangeApplyC chan struct {} // Notifies that a Raft configuration change has been applied
115
- configChangeInProgress uint32 // Flag to indicate node waiting for Raft config change to be applied
114
+ configChangeAppliedC chan struct {} // Notifies that a Raft configuration change has been applied
115
+ configChangeInProgress bool // Flag to indicate node waiting for Raft config change to be applied
116
116
raftMetadataLock sync.RWMutex
117
117
118
118
clock clock.Clock // Tests can inject a fake clock
@@ -171,29 +171,29 @@ func NewChain(
171
171
}
172
172
173
173
return & Chain {
174
- configurator : conf ,
175
- rpc : rpc ,
176
- channelID : support .ChainID (),
177
- raftID : opts .RaftID ,
178
- submitC : make (chan * orderer.SubmitRequest ),
179
- commitC : make (chan block ),
180
- haltC : make (chan struct {}),
181
- doneC : make (chan struct {}),
182
- resignC : make (chan struct {}),
183
- startC : make (chan struct {}),
184
- syncC : make (chan struct {}),
185
- snapC : make (chan * raftpb.Snapshot ),
186
- configChangeApplyC : make (chan struct {}),
187
- observeC : observeC ,
188
- support : support ,
189
- fresh : fresh ,
190
- appliedIndex : appliedi ,
191
- lastSnapBlockNum : snapBlkNum ,
192
- puller : puller ,
193
- clock : opts .Clock ,
194
- logger : lg ,
195
- storage : storage ,
196
- opts : opts ,
174
+ configurator : conf ,
175
+ rpc : rpc ,
176
+ channelID : support .ChainID (),
177
+ raftID : opts .RaftID ,
178
+ submitC : make (chan * orderer.SubmitRequest ),
179
+ commitC : make (chan block ),
180
+ haltC : make (chan struct {}),
181
+ doneC : make (chan struct {}),
182
+ resignC : make (chan struct {}),
183
+ startC : make (chan struct {}),
184
+ syncC : make (chan struct {}),
185
+ snapC : make (chan * raftpb.Snapshot ),
186
+ configChangeAppliedC : make (chan struct {}),
187
+ observeC : observeC ,
188
+ support : support ,
189
+ fresh : fresh ,
190
+ appliedIndex : appliedi ,
191
+ lastSnapBlockNum : snapBlkNum ,
192
+ puller : puller ,
193
+ clock : opts .Clock ,
194
+ logger : lg ,
195
+ storage : storage ,
196
+ opts : opts ,
197
197
}, nil
198
198
}
199
199
@@ -632,6 +632,13 @@ func (c *Chain) serveRaft() {
632
632
c .resignC <- struct {}{}
633
633
}
634
634
635
+ // becoming a leader and configuration change is in progress
636
+ if newLead == c .raftID && c .configChangeInProgress {
637
+ // need to read recent config updates of replica set
638
+ // and finish reconfiguration
639
+ c .handleReconfigurationFailover ()
640
+ }
641
+
635
642
// notify external observer
636
643
select {
637
644
case c .observeC <- newLead :
@@ -688,7 +695,7 @@ func (c *Chain) apply(ents []raftpb.Entry) {
688
695
if isConfigMembershipUpdate {
689
696
// set flag config change is progress only if config block
690
697
// and has updates for raft replica set
691
- atomic . StoreUint32 ( & c .configChangeInProgress , uint32 ( 1 ))
698
+ c .configChangeInProgress = true
692
699
}
693
700
694
701
c .commitC <- block {b , ents [i ].Index }
@@ -705,12 +712,11 @@ func (c *Chain) apply(ents []raftpb.Entry) {
705
712
706
713
c .confState = * c .node .ApplyConfChange (cc )
707
714
708
- // assert that configuration changes result of the processing
709
- // of configuration block of type B
710
- isConfChangeInProgress := atomic .LoadUint32 (& c .configChangeInProgress )
711
- if isConfChangeInProgress == 1 {
715
+ if c .configChangeInProgress {
712
716
// signal that config changes has been applied
713
- c .configChangeApplyC <- struct {}{}
717
+ c .configChangeAppliedC <- struct {}{}
718
+ // set flag back
719
+ c .configChangeInProgress = false
714
720
}
715
721
}
716
722
@@ -836,52 +842,39 @@ func (c *Chain) updateMembership(metadata *etcdraft.RaftMetadata, change *raftpb
836
842
lead := atomic .LoadUint64 (& c .leader )
837
843
// leader to propose configuration change
838
844
if lead == c .raftID {
845
+ // ProposeConfChange returns error only if node being stopped.
839
846
if err := c .node .ProposeConfChange (context .TODO (), * change ); err != nil {
840
- return errors .Errorf ("failed to propose configuration update to Raft node: %s" , err )
847
+ c .logger .Warnf ("Failed to propose configuration update to Raft node: %s" , err )
848
+ return nil
841
849
}
842
850
}
843
851
844
852
var err error
845
853
846
- select {
847
- case <- c .configChangeApplyC :
848
- // update metadata once we have block committed
849
- c .raftMetadataLock .Lock ()
850
- c .opts .RaftMetadata = metadata
851
- c .raftMetadataLock .Unlock ()
852
-
853
- // new we need to reconfigure the communication layer with new updates
854
- err = c .configureComm ()
855
- case <- c .resignC :
856
- // leadership has changed, new leader will have to take care
857
- // of reading last config block re-propose config update
858
- c .logger .Debug ("Raft cluster leader has changed, new leader should re-propose config change based on last config block" )
859
- case <- c .doneC :
860
- c .logger .Debug ("shutting down node, aborting config change update" )
854
+ for {
855
+ select {
856
+ case <- c .configChangeAppliedC : // Raft configuration changes of the raft cluster has been applied
857
+ // update metadata once we have block committed
858
+ c .raftMetadataLock .Lock ()
859
+ c .opts .RaftMetadata = metadata
860
+ c .raftMetadataLock .Unlock ()
861
+
862
+ // now we need to reconfigure the communication layer with new updates
863
+ return c .configureComm ()
864
+ case <- c .resignC :
865
+ c .logger .Debug ("Raft cluster leader has changed, new leader should re-propose Raft config change based on last config block" )
866
+ case <- c .doneC :
867
+ c .logger .Debug ("Shutting down node, aborting config change update" )
868
+ return err
869
+ }
861
870
}
862
-
863
- // set flag back
864
- atomic .StoreUint32 (& c .configChangeInProgress , uint32 (0 ))
865
- return err
866
871
}
867
872
868
873
// writeConfigBlock writes configuration blocks into the ledger in
869
874
// addition extracts updates about raft replica set and if there
870
875
// are changes updates cluster membership as well
871
876
func (c * Chain ) writeConfigBlock (b block ) error {
872
- metadata , err := ConsensusMetadataFromConfigBlock (b .b )
873
- if err != nil {
874
- c .logger .Panicf ("error reading consensus metadata, because of %s" , err )
875
- }
876
-
877
- c .raftMetadataLock .RLock ()
878
- raftMetadata := proto .Clone (c .opts .RaftMetadata ).(* etcdraft.RaftMetadata )
879
- // proto.Clone doesn't copy an empty map, hence need to initialize it after
880
- // cloning
881
- if raftMetadata .Consenters == nil {
882
- raftMetadata .Consenters = map [uint64 ]* etcdraft.Consenter {}
883
- }
884
- c .raftMetadataLock .RUnlock ()
877
+ metadata , raftMetadata := c .newRaftMetadata (b .b )
885
878
886
879
var changes * MembershipChanges
887
880
if metadata != nil {
@@ -901,3 +894,48 @@ func (c *Chain) writeConfigBlock(b block) error {
901
894
}
902
895
return nil
903
896
}
897
+
898
+ // handleReconfigurationFailover read last configuration block and proposes
899
+ // new raft configuration
900
+ func (c * Chain ) handleReconfigurationFailover () {
901
+ b := c .support .Block (c .support .Height () - 1 )
902
+ if b == nil {
903
+ c .logger .Panic ("nil block, failed to read last written block" )
904
+ }
905
+ if ! utils .IsConfigBlock (b ) {
906
+ // a node (leader or follower) leaving updateMembership in context of serverReq go routine,
907
+ // *iff* configuration entry has appeared and successfully applied.
908
+ // while it's blocked in updateMembership, it cannot commit any other block,
909
+ // therefore we guarantee the last block is config block
910
+ c .logger .Panic ("while handling reconfiguration failover last expected block should be configuration" )
911
+ }
912
+
913
+ metadata , raftMetadata := c .newRaftMetadata (b )
914
+
915
+ var changes * MembershipChanges
916
+ if metadata != nil {
917
+ changes = ComputeMembershipChanges (raftMetadata .Consenters , metadata .Consenters )
918
+ }
919
+
920
+ confChange := changes .UpdateRaftMetadataAndConfChange (raftMetadata )
921
+ if err := c .node .ProposeConfChange (context .TODO (), * confChange ); err != nil {
922
+ c .logger .Warnf ("failed to propose configuration update to Raft node: %s" , err )
923
+ }
924
+ }
925
+
926
+ // newRaftMetadata extract raft metadata from the configuration block
927
+ func (c * Chain ) newRaftMetadata (block * common.Block ) (* etcdraft.Metadata , * etcdraft.RaftMetadata ) {
928
+ metadata , err := ConsensusMetadataFromConfigBlock (block )
929
+ if err != nil {
930
+ c .logger .Panicf ("error reading consensus metadata: %s" , err )
931
+ }
932
+ c .raftMetadataLock .RLock ()
933
+ raftMetadata := proto .Clone (c .opts .RaftMetadata ).(* etcdraft.RaftMetadata )
934
+ // proto.Clone doesn't copy an empty map, hence need to initialize it after
935
+ // cloning
936
+ if raftMetadata .Consenters == nil {
937
+ raftMetadata .Consenters = map [uint64 ]* etcdraft.Consenter {}
938
+ }
939
+ c .raftMetadataLock .RUnlock ()
940
+ return metadata , raftMetadata
941
+ }
0 commit comments