From a76e376eea783d288103e78755bb8d11da3e080b Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Wed, 11 Dec 2024 11:36:23 +0800 Subject: [PATCH] fix(sync): do not allow to expand checkpointed tipsets --- app/submodule/chain/chain_submodule.go | 5 +-- pkg/chain/store.go | 30 ++++++++++------ pkg/chainsync/syncer/syncer.go | 50 +++++++++++++++++++++----- 3 files changed, 62 insertions(+), 23 deletions(-) diff --git a/app/submodule/chain/chain_submodule.go b/app/submodule/chain/chain_submodule.go index d28eb1cc5f..7d454ceb48 100644 --- a/app/submodule/chain/chain_submodule.go +++ b/app/submodule/chain/chain_submodule.go @@ -21,7 +21,6 @@ import ( "github.com/filecoin-project/venus/pkg/vmsupport" v0api "github.com/filecoin-project/venus/venus-shared/api/chain/v0" v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" - "github.com/filecoin-project/venus/venus-shared/types" ) // ChainSubmodule enhances the `Node` with chain capabilities. @@ -33,8 +32,7 @@ type ChainSubmodule struct { //nolint SystemCall vm.SyscallsImpl CirculatingSupplyCalculator *chain.CirculatingSupplyCalculator - CheckPoint types.TipSetKey - Drand beacon.Schedule + Drand beacon.Schedule config chainConfig @@ -92,7 +90,6 @@ func NewChainSubmodule(ctx context.Context, Drand: drand, config: config, Waiter: waiter, - CheckPoint: chainStore.GetCheckPoint(), } err = store.ChainReader.Load(context.TODO()) if err != nil { diff --git a/pkg/chain/store.go b/pkg/chain/store.go index 55620b1f4d..b74aff0b3c 100644 --- a/pkg/chain/store.go +++ b/pkg/chain/store.go @@ -100,7 +100,7 @@ type Store struct { // head is the tipset at the head of the best known chain. head *types.TipSet - checkPoint types.TipSetKey + checkPoint *types.TipSet // Protects head and genesisCid. mu sync.RWMutex @@ -143,7 +143,6 @@ func NewStore(chainDs repo.Datastore, bsstore: bsstore, headEvents: pubsub.New(64), - checkPoint: types.EmptyTSK, genesis: genesisCid, reorgNotifeeCh: make(chan ReorgNotifee), tsCache: tsCache, @@ -156,11 +155,22 @@ func NewStore(chainDs repo.Datastore, val, err := store.ds.Get(context.TODO(), CheckPoint) if err != nil { - store.checkPoint = types.NewTipSetKey(genesisCid) + store.checkPoint, err = store.GetTipSet(context.TODO(), types.NewTipSetKey(genesisCid)) + if err != nil { + panic(fmt.Errorf("cannot get genesis tipset: %w", err)) + } } else { - _ = store.checkPoint.UnmarshalCBOR(bytes.NewReader(val)) //nolint:staticcheck + var checkPointTSK types.TipSetKey + err := checkPointTSK.UnmarshalCBOR(bytes.NewReader(val)) + if err != nil { + panic(fmt.Errorf("cannot unmarshal checkpoint %s: %w", string(val), err)) + } + store.checkPoint, err = store.GetTipSet(context.TODO(), checkPointTSK) + if err != nil { + panic(fmt.Errorf("cannot get checkpoint tipset: %w", err)) + } } - log.Infof("check point value: %v", store.checkPoint) + log.Infof("load check point height: %d, key: %v", store.checkPoint.Height(), store.checkPoint.Key()) store.reorgCh = store.reorgWorker(context.TODO()) return store @@ -1112,8 +1122,8 @@ func (store *Store) SetCheckpoint(ctx context.Context, ts *types.TipSet) error { return err } - store.mu.RLock() - defer store.mu.RUnlock() + store.mu.Lock() + defer store.mu.Unlock() finality := store.head.Height() - policy.ChainFinality targetChain, currentChain := ts, store.head @@ -1167,7 +1177,7 @@ func (store *Store) SetCheckpoint(ctx context.Context, ts *types.TipSet) error { if err := store.ds.Put(ctx, CheckPoint, buf.Bytes()); err != nil { return fmt.Errorf("checkpoint failed: failed to record checkpoint in the datastore: %w", err) } - store.checkPoint = ts.Key() + store.checkPoint = ts return nil } @@ -1187,7 +1197,7 @@ func (store *Store) IsAncestorOf(ctx context.Context, a, b *types.TipSet) (bool, } // GetCheckPoint get the check point from store or disk. -func (store *Store) GetCheckPoint() types.TipSetKey { +func (store *Store) GetCheckPoint() *types.TipSet { store.mu.RLock() defer store.mu.RUnlock() @@ -1722,7 +1732,7 @@ func (store *Store) exceedsForkLength(ctx context.Context, synced, external *typ } // Now check to see if we've walked back to the checkpoint. - if synced.Key().Equals(store.checkPoint) { + if synced.Key().Equals(store.GetCheckPoint().Key()) { return true, nil } diff --git a/pkg/chainsync/syncer/syncer.go b/pkg/chainsync/syncer/syncer.go index ced8977138..58e505a98c 100644 --- a/pkg/chainsync/syncer/syncer.go +++ b/pkg/chainsync/syncer/syncer.go @@ -60,6 +60,7 @@ var ( ErrNewChainTooLong = errors.New("input chain forked from best chain past finality limit") // ErrUnexpectedStoreState indicates that the syncer's chain bsstore is violating expected invariants. ErrUnexpectedStoreState = errors.New("the chain bsstore is in an unexpected state") + ErrForkCheckpoint = fmt.Errorf("fork would require us to diverge from checkpointed block") logSyncer = logging.Logger("chainsync.syncer") ) @@ -137,8 +138,7 @@ type Syncer struct { clock clock.Clock - bsstore blockstoreutil.Blockstore - checkPoint types.TipSetKey + bsstore blockstoreutil.Blockstore fork fork.IFork @@ -201,7 +201,7 @@ func (syncer *Syncer) syncOne(ctx context.Context, parent, next *types.TipSet) e var err error - if !parent.Key().Equals(syncer.checkPoint) { + if !parent.Key().Equals(syncer.chainStore.GetCheckPoint().Key()) { var wg errgroup.Group for i := 0; i < next.Len(); i++ { blk := next.At(i) @@ -297,8 +297,25 @@ func (syncer *Syncer) HandleNewTipSet(ctx context.Context, target *syncTypes.Tar return errors.New("do not sync to a target has synced before") } + if target.Head.Height() == head.Height() { + // check if maybeHead is fully contained in headTipSet + // meaning we already synced all the blocks that are a part of maybeHead + // if that is the case, there is nothing for us to do + // we need to exit out early, otherwise checkpoint-fork logic might wrongly reject it + fullyContained := true + for _, c := range target.Head.Cids() { + if !head.Contains(c) { + fullyContained = false + break + } + } + if fullyContained { + return nil + } + } + syncer.exchangeClient.AddPeer(target.Sender) - tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head) + tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head, false) if err != nil { return errors.Wrapf(err, "failure fetching or validating headers") } @@ -346,7 +363,7 @@ func (syncer *Syncer) syncSegement(ctx context.Context, target *syncTypes.Target errProcessChan <- processErr return } - if !parent.Key().Equals(syncer.checkPoint) { + if !parent.Key().Equals(syncer.chainStore.GetCheckPoint().Key()) { logSyncer.Debugf("set chain head, height:%d, blocks:%d", parent.Height(), parent.Len()) if err := syncer.chainStore.RefreshHeaviestTipSet(ctx, parent.Height()); err != nil { errProcessChan <- err @@ -374,7 +391,7 @@ func (syncer *Syncer) syncSegement(ctx context.Context, target *syncTypes.Target // if local db not exist, get block from network(libp2p), // if there is a fork, get the common root tipset of knowntip and targettip, and return the block data from root tipset to targettip // local(···->A->B) + incoming(C->D->E) => ···->A->B->C->D->E -func (syncer *Syncer) fetchChainBlocks(ctx context.Context, knownTip *types.TipSet, targetTip *types.TipSet) ([]*types.TipSet, error) { +func (syncer *Syncer) fetchChainBlocks(ctx context.Context, knownTip *types.TipSet, targetTip *types.TipSet, ignoreCheckpoint bool) ([]*types.TipSet, error) { chainTipsets := []*types.TipSet{targetTip} flushDB := func(saveTips []*types.TipSet) error { bs := blockstoreutil.NewTemporary() @@ -448,6 +465,13 @@ loop: if err != nil { return nil, fmt.Errorf("failed to load next local tipset: %w", err) } + + if !ignoreCheckpoint { + if chkpt := syncer.chainStore.GetCheckPoint(); chkpt != nil && base.Height() <= chkpt.Height() { + return nil, fmt.Errorf("merge point affecting the checkpoing: %w", ErrForkCheckpoint) + } + } + if base.IsChildOf(knownParent) { // common case: receiving a block thats potentially part of the same tipset as our best block chain.Reverse(chainTipsets) @@ -456,7 +480,7 @@ loop: logSyncer.Warnf("(fork detected) synced header chain, base: %v(%d), knownTip: %v(%d)", base.Key(), base.Height(), knownTip.Key(), knownTip.Height()) - fork, err := syncer.syncFork(ctx, base, knownTip) + fork, err := syncer.syncFork(ctx, base, knownTip, ignoreCheckpoint) if err != nil { if errors.Is(err, ErrForkTooLong) { // TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish? @@ -486,7 +510,15 @@ loop: // D->E-F(targetTip) // A => D->E>F // B-C(knownTip) -func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) { +func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet, ignoreCheckpoint bool) ([]*types.TipSet, error) { + var chkpt *types.TipSet + if !ignoreCheckpoint { + chkpt = syncer.chainStore.GetCheckPoint() + if known.Equals(chkpt) { + return nil, ErrForkCheckpoint + } + } + incomingParentsTsk := incoming.Parents() commonParent := false for _, incomingParent := range incomingParentsTsk.Cids() { @@ -701,7 +733,7 @@ func (syncer *Syncer) SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) e if anc, err := syncer.chainStore.IsAncestorOf(ctx, ts, head); err != nil { return fmt.Errorf("failed to walk the chain when checkpointing: %w", err) } else if !anc { - tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head) + tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head, true) if err != nil { return errors.Wrapf(err, "failure fetching or validating headers") }