Skip to content

Commit

Permalink
[[FIX]] add fork chain detection(#1236)
Browse files Browse the repository at this point in the history
  • Loading branch information
bysomeone authored and 33cn committed Apr 18, 2022
1 parent 8aca51e commit 192058e
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 62 deletions.
196 changes: 137 additions & 59 deletions blockchain/blocksyn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var (
BackBlockNum int64 = 128 //节点高度不增加时向后取blocks的个数
BackwardBlockNum int64 = 16 //本节点高度不增加时并且落后peer的高度数
checkHeightNoIncSeconds int64 = 5 * 60 //高度不增长时的检测周期目前暂定5分钟
checkHeightNoIncSeconds int64 = 200 //高度不增长时的检测周期
checkBlockHashSeconds int64 = 1 * 60 //1分钟检测一次tip hash和peer 对应高度的hash是否一致
fetchPeerListSeconds int64 = 5 //5 秒获取一个peerlist
MaxRollBlockNum int64 = 10000 //最大回退block数量
Expand Down Expand Up @@ -160,6 +160,13 @@ func (chain *BlockChain) SynRoutine() {
//30s尝试从peer节点请求ChunkRecord
chunkRecordSynTicker := time.NewTicker(30 * time.Second)
defer chunkRecordSynTicker.Stop()

chain.UpdatesynBlkHeight(chain.GetBlockHeight())
mode := chain.GetDownloadSyncStatus()
chain.UpdateDownloadSyncStatus(forkChainDetectMode)
// make sure not on fork chain when node start
go chain.forkChainDectection(mode)

//节点下载模式
go chain.DownLoadBlocks()

Expand All @@ -186,8 +193,10 @@ func (chain *BlockChain) SynRoutine() {

case <-checkBlockHashTicker.C:
//synlog.Info("checkBlockHashTicker")
chain.tickerwg.Add(1)
go chain.CheckTipBlockHash()
if chain.GetDownloadSyncStatus() != forkChainDetectMode {
chain.tickerwg.Add(1)
go chain.CheckTipBlockHash()
}

//定时检查系统时间,如果系统时间有问题,那么会有一个报警
case <-checkClockDriftTicker.C:
Expand Down Expand Up @@ -433,6 +442,33 @@ func (chain *BlockChain) GetPeerInfo(pid string) *PeerInfo {
return nil
}

// getForkDetectPeer 区块高度次大节点
func (chain *BlockChain) getForkComparePeer() PeerInfo {

chain.peerMaxBlklock.Lock()
defer chain.peerMaxBlklock.Unlock()

if chain.peerList.Len() == 0 {
return PeerInfo{}
} else if chain.peerList.Len() == 1 {
return *chain.peerList[0]
} else {
return *chain.peerList[chain.peerList.Len()-2]
}
}

func (chain *BlockChain) getActivePeersByHeight(height int64) []string {
chain.peerMaxBlklock.Lock()
defer chain.peerMaxBlklock.Unlock()
peers := make([]string, 0, 8)
for _, peer := range chain.peerList {
if peer.Height > height {
peers = append(peers, peer.Name)
}
}
return peers
}

//GetMaxPeerInfo 获取peerlist中最高节点的peerinfo
func (chain *BlockChain) GetMaxPeerInfo() *PeerInfo {
chain.peerMaxBlklock.Lock()
Expand Down Expand Up @@ -643,6 +679,77 @@ func (chain *BlockChain) SynBlocksFromPeers() {
}
}

// dectect if run in fork chain, try to correct
func (chain *BlockChain) forkChainDectection(prevMode int) {

// restore download mode
defer chain.UpdateDownloadSyncStatus(prevMode)

cmpPeer := chain.getForkComparePeer()
for cmpPeer.Height == 0 {
time.Sleep(time.Second * 5)
cmpPeer = chain.getForkComparePeer()
}
localHeight := chain.GetBlockHeight()
chainlog.Debug("forkDectectInfo", "height", localHeight, "peerHeight", cmpPeer.Height)
if cmpPeer.Height < localHeight+BackwardBlockNum {
return
}

if err := chain.FetchBlockHeaders(localHeight-BackwardBlockNum, localHeight, cmpPeer.Name); err != nil {
chainlog.Error("forkDectectFetchHeaders", "err", err)
}

// wait for finding fork point
var forkHeight int64
select {
case <-time.After(time.Minute * 2):
chainlog.Error("forkDectect wait fork point timeout")
return
case forkHeight = <-chain.forkPointChan:
}

// no forks
if forkHeight >= localHeight {
chainlog.Debug("forkDectectNoForks")
return
}

activePeer := chain.getActivePeersByHeight(cmpPeer.Height)
chainlog.Debug("forkDectect", "activePeer", len(activePeer))
if len(activePeer) <= 0 {
return
}

readyDownload := make(chan struct{})

go func() {
for chain.downLoadTask.InProgress() {
time.Sleep(time.Second * 5)
}
readyDownload <- struct{}{}
}()

select {
case <-time.After(time.Minute * 5):
chainlog.Error("forkDectect wait download task timeout")
chain.downLoadTask.Cancel()
case <-readyDownload:
}
if chain.syncTask.InProgress() {
chain.syncTask.Cancel()
}

if chain.GetBlockHeight() > localHeight {
chainlog.Info("forkDectectBlkHeightIncreased", "prev", localHeight, "curr", chain.GetBlockHeight())
return
}

chainlog.Info("forkDectectDownBlk", "localHeight", localHeight, "forkHeight", forkHeight, "peers", len(activePeer))
go chain.ProcDownLoadBlocks(forkHeight, localHeight+1, activePeer)

}

//CheckHeightNoIncrease 在规定时间本链的高度没有增长,但peerlist中最新高度远远高于本节点高度,
//可能当前链是在分支链上,需从指定最长链的peer向后请求指定数量的blockheader
//请求bestchain.Height -BackBlockNum -- bestchain.Height的header
Expand All @@ -651,48 +758,28 @@ func (chain *BlockChain) CheckHeightNoIncrease() {
defer chain.tickerwg.Done()

//获取当前主链的最新高度
tipheight := chain.bestChain.Height()
laststorheight := chain.blockStore.Height()

if tipheight != laststorheight {
synlog.Error("CheckHeightNoIncrease", "tipheight", tipheight, "laststorheight", laststorheight)
return
}
//获取上个检测周期时的检测高度
checkheight := chain.GetsynBlkHeight()
localHeight := chain.GetBlockHeight()

//bestchain的tip高度在变化,更新最新的检测高度即可,高度可能在增长或者回退
if tipheight != checkheight {
chain.UpdatesynBlkHeight(tipheight)
if localHeight != chain.GetsynBlkHeight() {
chain.UpdatesynBlkHeight(localHeight)
return
}
//一个检测周期发现本节点bestchain的tip高度没有变化。
//远远落后于高度的peer节点并且最高peer节点不是最优链,本节点可能在侧链上,
//需要从最新的peer上向后取BackBlockNum个headers
maxpeer := chain.GetMaxPeerInfo()
if maxpeer == nil {
synlog.Error("CheckHeightNoIncrease GetMaxPeerInfo is nil")

mode := chain.GetDownloadSyncStatus()
chainlog.Debug("CheckHeightNoIncrease", "localHeight", localHeight, "downMode", mode)
if mode == forkChainDetectMode {
return
}
peermaxheight := maxpeer.Height
pid := maxpeer.Name
var err error
if peermaxheight > tipheight && (peermaxheight-tipheight) > BackwardBlockNum && !chain.isBestChainPeer(pid) {
//从指定peer向后请求BackBlockNum个blockheaders
synlog.Debug("CheckHeightNoIncrease", "tipheight", tipheight, "pid", pid)
if tipheight > BackBlockNum {
err = chain.FetchBlockHeaders(tipheight-BackBlockNum, tipheight, pid)
} else {
err = chain.FetchBlockHeaders(0, tipheight, pid)
}
if err != nil {
synlog.Error("CheckHeightNoIncrease FetchBlockHeaders", "err", err)
}
}
chain.UpdateDownloadSyncStatus(forkChainDetectMode)
chain.forkChainDectection(mode)
}

//FetchBlockHeaders 从指定pid获取start到end之间的headers
func (chain *BlockChain) FetchBlockHeaders(start int64, end int64, pid string) (err error) {
if start < 0 {
start = 0
}
if chain.client == nil {
synlog.Error("FetchBlockHeaders chain client not bind message queue.")
return types.ErrClientNotBindQueue
Expand Down Expand Up @@ -753,11 +840,7 @@ func (chain *BlockChain) ProcBlockHeader(headers *types.Headers, peerid string)
if !bytes.Equal(headers.Items[0].Hash, header.Hash) {
synlog.Info("ProcBlockHeader hash no equal", "height", height, "self hash", common.ToHex(header.Hash), "peer hash", common.ToHex(headers.Items[0].Hash))

if height > BackBlockNum {
err = chain.FetchBlockHeaders(height-BackBlockNum, height, peerid)
} else if height != 0 {
err = chain.FetchBlockHeaders(0, height, peerid)
}
err = chain.FetchBlockHeaders(height-BackBlockNum, height, peerid)
if err != nil {
synlog.Info("ProcBlockHeader FetchBlockHeaders", "err", err)
}
Expand Down Expand Up @@ -795,18 +878,23 @@ func (chain *BlockChain) ProcBlockHeaders(headers *types.Headers, pid string) er
}
//继续向后取指定数量的headers
height := headers.Items[0].Height
if height > BackBlockNum {
err = chain.FetchBlockHeaders(height-BackBlockNum, height, pid)
} else {
err = chain.FetchBlockHeaders(0, height, pid)
}
err = chain.FetchBlockHeaders(height-BackBlockNum, height, pid)
if err != nil {
synlog.Info("ProcBlockHeaders FetchBlockHeaders", "err", err)
}
return types.ErrContinueBack
}
synlog.Info("ProcBlockHeaders find fork point", "height", ForkHeight, "hash", common.ToHex(forkhash))

if chain.GetDownloadSyncStatus() == forkChainDetectMode {
synlog.Error("ProcBlockHeaders forkDetect")
select {
case chain.forkPointChan <- ForkHeight:
default:
}
return nil
}

//获取此pid对应的peer信息,
peerinfo := chain.GetPeerInfo(pid)
if peerinfo == nil {
Expand Down Expand Up @@ -888,13 +976,8 @@ func (chain *BlockChain) CheckTipBlockHash() {
} else if peermaxheight == tipheight {
// 直接tip block hash比较,如果不相等需要从peer向后去指定的headers,尝试寻找分叉点
if !bytes.Equal(tiphash, peerhash) {
if tipheight > BackBlockNum {
synlog.Debug("CheckTipBlockHash ==", "peermaxheight", peermaxheight, "tipheight", tipheight)
Err = chain.FetchBlockHeaders(tipheight-BackBlockNum, tipheight, pid)
} else {
synlog.Debug("CheckTipBlockHash !=", "peermaxheight", peermaxheight, "tipheight", tipheight)
Err = chain.FetchBlockHeaders(1, tipheight, pid)
}
synlog.Debug("CheckTipBlockHash ==", "peermaxheight", peermaxheight, "tipheight", tipheight)
Err = chain.FetchBlockHeaders(tipheight-BackBlockNum, tipheight, pid)
}
} else {

Expand All @@ -903,13 +986,8 @@ func (chain *BlockChain) CheckTipBlockHash() {
return
}
if !bytes.Equal(header.Hash, peerhash) {
if peermaxheight > BackBlockNum {
synlog.Debug("CheckTipBlockHash<!=", "peermaxheight", peermaxheight, "tipheight", tipheight)
Err = chain.FetchBlockHeaders(peermaxheight-BackBlockNum, peermaxheight, pid)
} else {
synlog.Debug("CheckTipBlockHash<!=", "peermaxheight", peermaxheight, "tipheight", tipheight)
Err = chain.FetchBlockHeaders(1, peermaxheight, pid)
}
synlog.Debug("CheckTipBlockHash<!=", "peermaxheight", peermaxheight, "tipheight", tipheight)
Err = chain.FetchBlockHeaders(peermaxheight-BackBlockNum, peermaxheight, pid)
}
}
if Err != nil {
Expand Down
3 changes: 3 additions & 0 deletions blockchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type BlockChain struct {
//记录本节点已经同步的block高度,用于节点追赶active链,处理节点分叉不同步的场景
synBlockHeight int64

forkPointChan chan int64

//记录peer的最新block高度,用于节点追赶active链
peerList PeerInfoList
recvwg *sync.WaitGroup
Expand Down Expand Up @@ -194,6 +196,7 @@ func New(cfg *types.Chain33Config) *BlockChain {
downloadMode: fastDownLoadMode,
blockOnChain: &BlockOnChain{},
onChainTimeout: 0,
forkPointChan: make(chan int64, 1),
}
blockchain.initConfig(cfg)
blockchain.blockCache = newBlockCache(cfg, defaultBlockHashCacheSize)
Expand Down
11 changes: 8 additions & 3 deletions blockchain/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ const (
//快速下载时需要的最少peer数量
bestPeerCount = 2

normalDownLoadMode = 0
fastDownLoadMode = 1
chunkDownLoadMode = 2
normalDownLoadMode = 0
fastDownLoadMode = 1
chunkDownLoadMode = 2
forkChainDetectMode = 3
)

//DownLoadInfo blockchain模块下载block处理结构体
Expand Down Expand Up @@ -431,6 +432,10 @@ func (chain *BlockChain) DownLoadTimeOutProc(height int64) {

// DownLoadBlocks 下载区块
func (chain *BlockChain) DownLoadBlocks() {
// wait fork chain detection
for chain.GetDownloadSyncStatus() == forkChainDetectMode {
time.Sleep(time.Second)
}
if !chain.cfg.DisableShard && chain.cfg.EnableFetchP2pstore {
// 1.节点开启时候首先尝试进行chunkDownLoad下载
chain.UpdateDownloadSyncStatus(chunkDownLoadMode) // 默认模式是fastDownLoadMode
Expand Down

0 comments on commit 192058e

Please sign in to comment.