Skip to content

Commit f98f7c4

Browse files
C0rWinyacovm
authored andcommitted
[FAB-12576] failover while handling tx type B
Raft cluster reconfigiration consists of two parts, first leader has to consent on configuration block, next leader has to extract new cluster configuration and propose raft configuration changes. However leader might fail between first and second parts, therefore newly selected leader should be able to detect there is unfinished reconfiguration and to finish reconfiguration. This commit adds logic to manage leadership failover, where new leader checks whenever last committed block is configuration block, whenever there are pending configuration changes and complete reconfiguration by proposing raft clust configuration changes. Change-Id: I05dc1f60c9ab692521887b50f726d96ea47878dc Signed-off-by: Artem Barger <bartem@il.ibm.com>
1 parent 9039499 commit f98f7c4

File tree

2 files changed

+196
-64
lines changed

2 files changed

+196
-64
lines changed

orderer/consensus/etcdraft/chain.go

Lines changed: 102 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ type Chain struct {
111111
startC chan struct{} // Closes when the node is started
112112
snapC chan *raftpb.Snapshot // Signal to catch up with snapshot
113113

114-
configChangeApplyC chan struct{} // Notifies that a Raft configuration change has been applied
115-
configChangeInProgress uint32 // Flag to indicate node waiting for Raft config change to be applied
114+
configChangeAppliedC chan struct{} // Notifies that a Raft configuration change has been applied
115+
configChangeInProgress bool // Flag to indicate node waiting for Raft config change to be applied
116116
raftMetadataLock sync.RWMutex
117117

118118
clock clock.Clock // Tests can inject a fake clock
@@ -171,29 +171,29 @@ func NewChain(
171171
}
172172

173173
return &Chain{
174-
configurator: conf,
175-
rpc: rpc,
176-
channelID: support.ChainID(),
177-
raftID: opts.RaftID,
178-
submitC: make(chan *orderer.SubmitRequest),
179-
commitC: make(chan block),
180-
haltC: make(chan struct{}),
181-
doneC: make(chan struct{}),
182-
resignC: make(chan struct{}),
183-
startC: make(chan struct{}),
184-
syncC: make(chan struct{}),
185-
snapC: make(chan *raftpb.Snapshot),
186-
configChangeApplyC: make(chan struct{}),
187-
observeC: observeC,
188-
support: support,
189-
fresh: fresh,
190-
appliedIndex: appliedi,
191-
lastSnapBlockNum: snapBlkNum,
192-
puller: puller,
193-
clock: opts.Clock,
194-
logger: lg,
195-
storage: storage,
196-
opts: opts,
174+
configurator: conf,
175+
rpc: rpc,
176+
channelID: support.ChainID(),
177+
raftID: opts.RaftID,
178+
submitC: make(chan *orderer.SubmitRequest),
179+
commitC: make(chan block),
180+
haltC: make(chan struct{}),
181+
doneC: make(chan struct{}),
182+
resignC: make(chan struct{}),
183+
startC: make(chan struct{}),
184+
syncC: make(chan struct{}),
185+
snapC: make(chan *raftpb.Snapshot),
186+
configChangeAppliedC: make(chan struct{}),
187+
observeC: observeC,
188+
support: support,
189+
fresh: fresh,
190+
appliedIndex: appliedi,
191+
lastSnapBlockNum: snapBlkNum,
192+
puller: puller,
193+
clock: opts.Clock,
194+
logger: lg,
195+
storage: storage,
196+
opts: opts,
197197
}, nil
198198
}
199199

@@ -632,6 +632,13 @@ func (c *Chain) serveRaft() {
632632
c.resignC <- struct{}{}
633633
}
634634

635+
// becoming a leader and configuration change is in progress
636+
if newLead == c.raftID && c.configChangeInProgress {
637+
// need to read recent config updates of replica set
638+
// and finish reconfiguration
639+
c.handleReconfigurationFailover()
640+
}
641+
635642
// notify external observer
636643
select {
637644
case c.observeC <- newLead:
@@ -688,7 +695,7 @@ func (c *Chain) apply(ents []raftpb.Entry) {
688695
if isConfigMembershipUpdate {
689696
// set flag config change is progress only if config block
690697
// and has updates for raft replica set
691-
atomic.StoreUint32(&c.configChangeInProgress, uint32(1))
698+
c.configChangeInProgress = true
692699
}
693700

694701
c.commitC <- block{b, ents[i].Index}
@@ -705,12 +712,11 @@ func (c *Chain) apply(ents []raftpb.Entry) {
705712

706713
c.confState = *c.node.ApplyConfChange(cc)
707714

708-
// assert that configuration changes result of the processing
709-
// of configuration block of type B
710-
isConfChangeInProgress := atomic.LoadUint32(&c.configChangeInProgress)
711-
if isConfChangeInProgress == 1 {
715+
if c.configChangeInProgress {
712716
// signal that config changes has been applied
713-
c.configChangeApplyC <- struct{}{}
717+
c.configChangeAppliedC <- struct{}{}
718+
// set flag back
719+
c.configChangeInProgress = false
714720
}
715721
}
716722

@@ -836,52 +842,39 @@ func (c *Chain) updateMembership(metadata *etcdraft.RaftMetadata, change *raftpb
836842
lead := atomic.LoadUint64(&c.leader)
837843
// leader to propose configuration change
838844
if lead == c.raftID {
845+
// ProposeConfChange returns error only if node being stopped.
839846
if err := c.node.ProposeConfChange(context.TODO(), *change); err != nil {
840-
return errors.Errorf("failed to propose configuration update to Raft node: %s", err)
847+
c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
848+
return nil
841849
}
842850
}
843851

844852
var err error
845853

846-
select {
847-
case <-c.configChangeApplyC:
848-
// update metadata once we have block committed
849-
c.raftMetadataLock.Lock()
850-
c.opts.RaftMetadata = metadata
851-
c.raftMetadataLock.Unlock()
852-
853-
// new we need to reconfigure the communication layer with new updates
854-
err = c.configureComm()
855-
case <-c.resignC:
856-
// leadership has changed, new leader will have to take care
857-
// of reading last config block re-propose config update
858-
c.logger.Debug("Raft cluster leader has changed, new leader should re-propose config change based on last config block")
859-
case <-c.doneC:
860-
c.logger.Debug("shutting down node, aborting config change update")
854+
for {
855+
select {
856+
case <-c.configChangeAppliedC: // Raft configuration changes of the raft cluster has been applied
857+
// update metadata once we have block committed
858+
c.raftMetadataLock.Lock()
859+
c.opts.RaftMetadata = metadata
860+
c.raftMetadataLock.Unlock()
861+
862+
// now we need to reconfigure the communication layer with new updates
863+
return c.configureComm()
864+
case <-c.resignC:
865+
c.logger.Debug("Raft cluster leader has changed, new leader should re-propose Raft config change based on last config block")
866+
case <-c.doneC:
867+
c.logger.Debug("Shutting down node, aborting config change update")
868+
return err
869+
}
861870
}
862-
863-
// set flag back
864-
atomic.StoreUint32(&c.configChangeInProgress, uint32(0))
865-
return err
866871
}
867872

868873
// writeConfigBlock writes configuration blocks into the ledger in
869874
// addition extracts updates about raft replica set and if there
870875
// are changes updates cluster membership as well
871876
func (c *Chain) writeConfigBlock(b block) error {
872-
metadata, err := ConsensusMetadataFromConfigBlock(b.b)
873-
if err != nil {
874-
c.logger.Panicf("error reading consensus metadata, because of %s", err)
875-
}
876-
877-
c.raftMetadataLock.RLock()
878-
raftMetadata := proto.Clone(c.opts.RaftMetadata).(*etcdraft.RaftMetadata)
879-
// proto.Clone doesn't copy an empty map, hence need to initialize it after
880-
// cloning
881-
if raftMetadata.Consenters == nil {
882-
raftMetadata.Consenters = map[uint64]*etcdraft.Consenter{}
883-
}
884-
c.raftMetadataLock.RUnlock()
877+
metadata, raftMetadata := c.newRaftMetadata(b.b)
885878

886879
var changes *MembershipChanges
887880
if metadata != nil {
@@ -901,3 +894,48 @@ func (c *Chain) writeConfigBlock(b block) error {
901894
}
902895
return nil
903896
}
897+
898+
// handleReconfigurationFailover read last configuration block and proposes
899+
// new raft configuration
900+
func (c *Chain) handleReconfigurationFailover() {
901+
b := c.support.Block(c.support.Height() - 1)
902+
if b == nil {
903+
c.logger.Panic("nil block, failed to read last written block")
904+
}
905+
if !utils.IsConfigBlock(b) {
906+
// a node (leader or follower) leaving updateMembership in context of serverReq go routine,
907+
// *iff* configuration entry has appeared and successfully applied.
908+
// while it's blocked in updateMembership, it cannot commit any other block,
909+
// therefore we guarantee the last block is config block
910+
c.logger.Panic("while handling reconfiguration failover last expected block should be configuration")
911+
}
912+
913+
metadata, raftMetadata := c.newRaftMetadata(b)
914+
915+
var changes *MembershipChanges
916+
if metadata != nil {
917+
changes = ComputeMembershipChanges(raftMetadata.Consenters, metadata.Consenters)
918+
}
919+
920+
confChange := changes.UpdateRaftMetadataAndConfChange(raftMetadata)
921+
if err := c.node.ProposeConfChange(context.TODO(), *confChange); err != nil {
922+
c.logger.Warnf("failed to propose configuration update to Raft node: %s", err)
923+
}
924+
}
925+
926+
// newRaftMetadata extract raft metadata from the configuration block
927+
func (c *Chain) newRaftMetadata(block *common.Block) (*etcdraft.Metadata, *etcdraft.RaftMetadata) {
928+
metadata, err := ConsensusMetadataFromConfigBlock(block)
929+
if err != nil {
930+
c.logger.Panicf("error reading consensus metadata: %s", err)
931+
}
932+
c.raftMetadataLock.RLock()
933+
raftMetadata := proto.Clone(c.opts.RaftMetadata).(*etcdraft.RaftMetadata)
934+
// proto.Clone doesn't copy an empty map, hence need to initialize it after
935+
// cloning
936+
if raftMetadata.Consenters == nil {
937+
raftMetadata.Consenters = map[uint64]*etcdraft.Consenter{}
938+
}
939+
c.raftMetadataLock.RUnlock()
940+
return metadata, raftMetadata
941+
}

orderer/consensus/etcdraft/chain_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"code.cloudfoundry.org/clock/fakeclock"
2020
"github.com/coreos/etcd/raft"
2121
"github.com/golang/protobuf/proto"
22+
"github.com/hyperledger/fabric/bccsp/factory"
2223
"github.com/hyperledger/fabric/common/crypto/tlsgen"
2324
"github.com/hyperledger/fabric/common/flogging"
2425
mockconfig "github.com/hyperledger/fabric/common/mocks/config"
@@ -44,6 +45,10 @@ const (
4445
HEARTBEAT_TICK = 1
4546
)
4647

48+
func init() {
49+
factory.InitFactories(nil)
50+
}
51+
4752
// for some test cases we chmod file/dir to test failures caused by exotic permissions.
4853
// however this does not work if tests are running as root, i.e. in a container.
4954
func skipIfRoot() {
@@ -1537,6 +1542,93 @@ var _ = Describe("Chain", func() {
15371542
Eventually(c.support.WriteBlockCallCount, defaultTimeout).Should(Equal(3))
15381543
})
15391544
})
1545+
1546+
It("stop leader and continue reconfiguration failing over to new leader", func() {
1547+
// Scenario: Starting replica set of 3 Raft nodes, electing node c1 to be a leader
1548+
// configure chain support mock to disconnect c1 right after it writes configuration block
1549+
// into the ledger, this to simulate failover.
1550+
// Next boostraping a new node c4 to join a cluster and creating config transaction, submitting
1551+
// it to the leader. Once leader writes configuration block it fails and leadership transferred to
1552+
// c2.
1553+
// Test asserts that new node c4, will join the cluster and c2 will handle failover of
1554+
// re-configuration. Later we connecting c1 back and making sure it capable of catching up with
1555+
// new configuration and successfully rejoins replica set.
1556+
1557+
c4 := newChain(timeout, channelID, dataDir, 4, &raftprotos.RaftMetadata{
1558+
Consenters: map[uint64]*raftprotos.Consenter{},
1559+
})
1560+
c4.init()
1561+
1562+
By("adding new node to the network")
1563+
Expect(c4.support.WriteBlockCallCount()).Should(Equal(0))
1564+
Expect(c4.support.WriteConfigBlockCallCount()).Should(Equal(0))
1565+
1566+
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, addConsenterConfigValue()))
1567+
c1.cutter.CutNext = true
1568+
configBlock := &common.Block{
1569+
Header: &common.BlockHeader{},
1570+
Data: &common.BlockData{Data: [][]byte{marshalOrPanic(configEnv)}}}
1571+
1572+
c1.support.CreateNextBlockReturns(configBlock)
1573+
1574+
c1.support.WriteConfigBlockStub = func(_ *common.Block, _ []byte) {
1575+
// disconnect leader after block being committed
1576+
network.disconnect(1)
1577+
// electing new leader
1578+
network.elect(2)
1579+
}
1580+
1581+
// mock Block method to return recent configuration block
1582+
c2.support.BlockReturns(configBlock)
1583+
1584+
By("sending config transaction")
1585+
err := c1.Configure(configEnv, 0)
1586+
Expect(err).ToNot(HaveOccurred())
1587+
1588+
// every node has written config block to the OSN ledger
1589+
network.exec(
1590+
func(c *chain) {
1591+
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
1592+
})
1593+
1594+
network.addChain(c4)
1595+
c4.Start()
1596+
// ConfChange is applied to etcd/raft asynchronously, meaning node 4 is not added
1597+
// to leader's node list right away. An immediate tick does not trigger a heartbeat
1598+
// being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins
1599+
// the cluster successfully.
1600+
Eventually(func() <-chan uint64 {
1601+
c2.clock.Increment(interval)
1602+
return c4.observe
1603+
}, LongEventualTimeout).Should(Receive(Equal(uint64(2))))
1604+
1605+
Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
1606+
Eventually(c4.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
1607+
1608+
By("submitting new transaction to follower")
1609+
c2.cutter.CutNext = true
1610+
c2.support.CreateNextBlockReturns(normalBlock)
1611+
err = c4.Order(env, 0)
1612+
Expect(err).ToNot(HaveOccurred())
1613+
1614+
c2.clock.Increment(interval)
1615+
1616+
// rest nodes are alive include a newly added, hence should write 2 blocks
1617+
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
1618+
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
1619+
Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
1620+
1621+
// node 1 has been stopped should not write any block
1622+
Consistently(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
1623+
1624+
network.connect(1)
1625+
1626+
c2.clock.Increment(interval)
1627+
// check that former leader didn't get stuck and actually got resign signal,
1628+
// and once connected capable of communicating with rest of the replicas set
1629+
Eventually(c1.observe, LongEventualTimeout).Should(Receive(Equal(uint64(2))))
1630+
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
1631+
})
15401632
})
15411633
})
15421634

@@ -2226,7 +2318,9 @@ func (n *network) elect(id uint64) (tick int) {
22262318
// tick so it could take effect.
22272319
t := 1000 * time.Millisecond
22282320

2321+
n.connLock.RLock()
22292322
c := n.chains[id]
2323+
n.connLock.RUnlock()
22302324

22312325
var elected bool
22322326
for !elected {

0 commit comments

Comments
 (0)