Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions consensus/cbft/consensus_process_rg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ func TestRGBlockQuorumCert(t *testing.T) {
time.Sleep(5 * time.Second)

assert.NotNil(t, nodes[0].engine.state.FindRGBlockQuorumCerts(blockIndex, groupID, uint32(validatorIndex)))
assert.Equal(t, 2, nodes[0].engine.state.RGBlockQuorumCertsLen(blockIndex, groupID))
assert.Equal(t, 2, len(nodes[0].engine.state.RGBlockQuorumCertsIndexes(blockIndex, groupID)))
// The block is already QC, so node-0 need not to SendRGBlockQuorumCert
assert.Equal(t, 1, nodes[0].engine.state.RGBlockQuorumCertsLen(blockIndex, groupID))
assert.Equal(t, 1, len(nodes[0].engine.state.RGBlockQuorumCertsIndexes(blockIndex, groupID)))
assert.True(t, true, nodes[0].engine.state.HadSendRGBlockQuorumCerts(blockIndex))

assert.Equal(t, 1, nodes[0].engine.state.SelectRGQuorumCertsLen(blockIndex, groupID))
Expand Down
2 changes: 1 addition & 1 deletion consensus/cbft/protocols/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ type RGBlockQuorumCert struct {
}

func (rgb *RGBlockQuorumCert) String() string {
return fmt.Sprintf("{GroupID:%d,Epoch:%d,ViewNumber:%d,BlockIndx:%d,Hash:%s,Number:%d,ValidatorIndex:%d,Signature:%s,ValidatorSetLen:%d}",
return fmt.Sprintf("{GroupID:%d,Epoch:%d,ViewNumber:%d,BlockIndex:%d,Hash:%s,Number:%d,ValidatorIndex:%d,Signature:%s,ValidatorSetLen:%d}",
rgb.GroupID, rgb.EpochNum(), rgb.ViewNum(), rgb.BlockIndx(), rgb.BHash().TerminalString(), rgb.BlockNum(), rgb.NodeIndex(), rgb.Signature.String(), rgb.BlockQC.ValidatorSet.HasLength())
}

Expand Down
56 changes: 36 additions & 20 deletions consensus/cbft/rg_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cbft

import (
"github.com/AlayaNetwork/Alaya-Go/consensus/cbft/protocols"
"github.com/AlayaNetwork/Alaya-Go/core/cbfttypes"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -218,40 +219,60 @@ func (m *RGBroadcastManager) allowRGQuorumCert(a awaiting) bool {
return true
}

func (m *RGBroadcastManager) upgradeCoordinator(a awaiting) bool {
// Check whether the current node is the first group member
func (m *RGBroadcastManager) upgradeCoordinator(a awaiting) (bool, *cbfttypes.ValidateNode) {
// Check whether the current node is the validator
node, err := m.cbft.isCurrentValidator()
if err != nil || node == nil {
m.cbft.log.Debug("Current node is not validator, no need to send RGQuorumCert")
return false, nil
}

// Check whether the current node is the group member
groupID, unitID, err := m.cbft.getGroupByValidatorID(m.cbft.state.Epoch(), m.cbft.Node().ID())
if err != nil || groupID != a.GroupID() {
return false
}
if unitID == defaultUnitID { // the first echelon, Send by default
return true
return false, nil
}
//if unitID == defaultUnitID { // the first echelon, Send by default
// return true
//}

coordinatorIndexes, err := m.cbft.validatorPool.GetCoordinatorIndexesByGroupID(m.cbft.state.Epoch(), groupID)
if err != nil || len(coordinatorIndexes) <= 0 {
m.cbft.log.Error("Get coordinator indexes by groupID error")
return false
return false, nil
}
m.cbft.log.Trace("CoordinatorIndexes", "groupID", groupID, "unitID", unitID, "coordinatorIndexes", coordinatorIndexes)

var receiveIndexes []uint32

switch msg := a.(type) {
case *awaitingRGBlockQC:
// Query the QuorumCert with the largest number of signatures in the current group
blockQC, _ := m.cbft.state.FindMaxGroupRGQuorumCert(msg.blockIndex, msg.GroupID())
if blockQC == nil {
m.cbft.log.Error("Cannot find the RGBlockQuorumCert of the current group", "blockIndex", msg.blockIndex, "groupID", msg.GroupID())
return false, nil
}
// If the block is already QC, there is no need to continue sending RGBlockQuorumCert
if m.cbft.blockTree.FindBlockByHash(blockQC.BlockHash) != nil || blockQC.BlockNumber <= m.cbft.state.HighestLockBlock().NumberU64() {
m.cbft.log.Debug("The block is already QC, no need to send RGBlockQuorumCert", "blockIndex", msg.blockIndex, "blockNumber", blockQC.BlockNumber, "blockHash", blockQC.BlockHash, "groupID", msg.GroupID())
return false, nil
}
receiveIndexes = m.cbft.state.RGBlockQuorumCertsIndexes(msg.blockIndex, groupID)
case *awaitingRGViewQC:
receiveIndexes = m.cbft.state.RGViewChangeQuorumCertsIndexes(groupID)
default:
return false
return false, nil
}
if !m.enoughCoordinator(groupID, unitID, coordinatorIndexes, receiveIndexes) {
m.cbft.log.Warn("Upgrade the current node to Coordinator", "type", reflect.TypeOf(a), "groupID", groupID, "unitID", unitID, "blockIndex", a.Index(), "coordinatorIndexes", coordinatorIndexes, "receiveIndexes", receiveIndexes)
m.recordUpgradeCoordinatorMetrics(a)
return true
if unitID > defaultUnitID {
m.cbft.log.Warn("Upgrade the current node to coordinator", "type", reflect.TypeOf(a), "groupID", groupID, "unitID", unitID, "blockIndex", a.Index(), "nodeIndex", node.Index, "coordinatorIndexes", coordinatorIndexes, "receiveIndexes", receiveIndexes)
m.recordUpgradeCoordinatorMetrics(a)
}
return true, node
}
m.cbft.log.Debug("Enough Coordinator, no need to upgrade to Coordinator", "groupID", groupID, "unitID", unitID, "coordinatorIndexes", coordinatorIndexes, "receiveIndexes", receiveIndexes)
return false
m.cbft.log.Debug("Enough coordinator, no need to upgrade to coordinator", "type", reflect.TypeOf(a), "groupID", groupID, "unitID", unitID, "blockIndex", a.Index(), "nodeIndex", node.Index, "coordinatorIndexes", coordinatorIndexes, "receiveIndexes", receiveIndexes)
return false, nil
}

func (m *RGBroadcastManager) recordUpgradeCoordinatorMetrics(a awaiting) {
Expand Down Expand Up @@ -307,13 +328,8 @@ func (m *RGBroadcastManager) broadcastFunc(a awaiting) {
return
}

if !m.upgradeCoordinator(a) {
return
}

node, err := m.cbft.isCurrentValidator()
if err != nil || node == nil {
m.cbft.log.Debug("Current node is not validator, no need to send RGQuorumCert")
upgrade, node := m.upgradeCoordinator(a)
if !upgrade {
return
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/cbft/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func TestViewRGViewChangeQuorumCerts(t *testing.T) {
assert.NotNil(t, rg)
assert.Equal(t, uint32(12), rg.ValidatorIndex)

assert.Equal(t, []uint32{22, 23}, v.RGViewChangeQuorumCertsIndexes(2))
assert.Equal(t, 2, len(v.RGViewChangeQuorumCertsIndexes(2)))
assert.Equal(t, 4, v.FindMaxRGViewChangeQuorumCert(2).ViewChangeQC.HasLength())
}

Expand Down
5 changes: 4 additions & 1 deletion core/cbfttypes/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,10 @@ func (vs *Validators) GetGroupValidators(nodeID enode.ID) (*GroupValidators, err
break
}
}
return ret, nil
if ret != nil {
return ret, nil
}
return nil, errors.New("not found the specified validators")
}

func (vs *Validators) UnitID(nodeID enode.ID) (uint32, error) {
Expand Down