Skip to content

Commit

Permalink
feat: stmgr: cache migrated stateroots
Browse files Browse the repository at this point in the history
  • Loading branch information
travisperson committed Feb 27, 2023
1 parent 61f29a8 commit b8901ef
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 27 deletions.
2 changes: 1 addition & 1 deletion chain/gen/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS
//return nil, xerrors.Errorf("creating drand beacon: %w", err)
//}

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), sys, us, beac)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), sys, us, beac, ds)
if err != nil {
return nil, xerrors.Errorf("initing stmgr: %w", err)
}
Expand Down
16 changes: 14 additions & 2 deletions chain/stmgr/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,22 @@ func (us UpgradeSchedule) GetNtwkVersion(e abi.ChainEpoch) (network.Version, err

func (sm *StateManager) HandleStateForks(ctx context.Context, root cid.Cid, height abi.ChainEpoch, cb ExecMonitor, ts *types.TipSet) (cid.Cid, error) {
retCid := root
var err error
u := sm.stateMigrations[height]
if u != nil && u.upgrade != nil {
migCid, ok, err := u.resultCache.Result(ctx, root)
if err == nil && ok {
log.Warnw("CACHED migration", "height", height, "from", root, "to", migCid)
return migCid, nil
} else if err != nil {
log.Errorw("failed to lookup previous migration result", "err", err)
}

startTime := time.Now()
log.Warnw("STARTING migration", "height", height, "from", root)
// Yes, we clone the cache, even for the final upgrade epoch. Why? Reverts. We may
// have to migrate multiple times.
tmpCache := u.cache.Clone()
retCid, err = u.upgrade(ctx, sm, tmpCache, cb, root, height, ts)
retCid, err := u.upgrade(ctx, sm, tmpCache, cb, root, height, ts)
if err != nil {
log.Errorw("FAILED migration", "height", height, "from", root, "error", err)
return cid.Undef, err
Expand All @@ -192,6 +199,11 @@ func (sm *StateManager) HandleStateForks(ctx context.Context, root cid.Cid, heig
"to", retCid,
"duration", time.Since(startTime),
)

// Only set if migration ran, we do not want a root => root mapping
if err := u.resultCache.Store(ctx, root, retCid); err != nil {
log.Errorw("failed to store migration result", "err", err)
}
}

return retCid, nil
Expand Down
107 changes: 104 additions & 3 deletions chain/stmgr/forks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
ipldcbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/stmgr"
. "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
Expand Down Expand Up @@ -165,7 +167,7 @@ func TestForkHeightTriggers(t *testing.T) {
}

return st.Flush(ctx)
}}}, cg.BeaconSchedule())
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -283,7 +285,7 @@ func testForkRefuseCall(t *testing.T, nullsBefore, nullsAfter int) {
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
migrationCount++
return root, nil
}}}, cg.BeaconSchedule())
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -501,7 +503,7 @@ func TestForkPreMigration(t *testing.T) {
return nil
},
}}},
}, cg.BeaconSchedule())
}, cg.BeaconSchedule(), datastore.NewMapDatastore())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -535,3 +537,102 @@ func TestForkPreMigration(t *testing.T) {
// to this channel.
require.Equal(t, 6, len(counter))
}

func TestMigrtionCache(t *testing.T) {
logging.SetAllLoggers(logging.LevelInfo)

cg, err := gen.NewGenerator()
require.NoError(t, err)

counter := make(chan struct{}, 10)
metadataDs := datastore.NewMapDatastore()

sm, err := NewStateManager(
cg.ChainStore(),
consensus.NewTipSetExecutor(filcns.RewardFunc),
cg.StateManager().VMSys(),
UpgradeSchedule{{
Network: network.Version1,
Height: testForkHeight,
Migration: func(_ context.Context, _ *StateManager, _ MigrationCache, _ ExecMonitor,
root cid.Cid, _ abi.ChainEpoch, _ *types.TipSet) (cid.Cid, error) {

counter <- struct{}{}

return root, nil
}},
},
cg.BeaconSchedule(),
metadataDs,
)
require.NoError(t, err)
require.NoError(t, sm.Start(context.Background()))
defer func() {
require.NoError(t, sm.Stop(context.Background()))
}()

inv := consensus.NewActorRegistry()
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
inv.Register(actorstypes.Version0, nil, registry)

sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
nvm, err := vm.NewLegacyVM(ctx, vmopt)
require.NoError(t, err)
nvm.SetInvoker(inv)
return nvm, nil
})

cg.SetStateManager(sm)

for i := 0; i < 50; i++ {
_, err := cg.NextTipSet()
require.NoError(t, err)
}

ts, err := cg.ChainStore().GetTipsetByHeight(context.Background(), testForkHeight, nil, false)
require.NoError(t, err)

root, _, err := stmgr.ComputeState(context.Background(), sm, testForkHeight+1, []*types.Message{}, ts)
require.NoError(t, err)
t.Log(root)

require.Equal(t, 1, len(counter))

{
sm, err := NewStateManager(
cg.ChainStore(),
consensus.NewTipSetExecutor(filcns.RewardFunc),
cg.StateManager().VMSys(),
UpgradeSchedule{{
Network: network.Version1,
Height: testForkHeight,
Migration: func(_ context.Context, _ *StateManager, _ MigrationCache, _ ExecMonitor,
root cid.Cid, _ abi.ChainEpoch, _ *types.TipSet) (cid.Cid, error) {

counter <- struct{}{}

return root, nil
}},
},
cg.BeaconSchedule(),
metadataDs,
)
require.NoError(t, err)
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
nvm, err := vm.NewLegacyVM(ctx, vmopt)
require.NoError(t, err)
nvm.SetInvoker(inv)
return nvm, nil
})

ctx := context.Background()

base, _, err := sm.ExecutionTrace(ctx, ts)
require.NoError(t, err)
_, err = sm.HandleStateForks(context.Background(), base, ts.Height(), nil, ts)
require.NoError(t, err)

// Should not have increased as we should be using the cached results in the metadataDs
require.Equal(t, 1, len(counter))
}
}
56 changes: 53 additions & 3 deletions chain/stmgr/stmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package stmgr

import (
"context"
"fmt"
"sync"

"github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -54,6 +56,48 @@ type migration struct {
upgrade MigrationFunc
preMigrations []PreMigration
cache *nv16.MemMigrationCache
resultCache *resultCache
}

type resultCache struct {
ds dstore.Batching
keyPrefix string
}

func (m *resultCache) Result(ctx context.Context, root cid.Cid) (cid.Cid, bool, error) {
kStr := fmt.Sprintf("%s-%s", m.keyPrefix, root)
k := dstore.NewKey(kStr)

found, err := m.ds.Has(ctx, k)
if err != nil {
return cid.Undef, false, xerrors.Errorf("error looking up migration result: %w", err)
}

if !found {
return cid.Undef, false, nil
}

bs, err := m.ds.Get(ctx, k)
if err != nil {
return cid.Undef, false, xerrors.Errorf("error loading migration result: %w", err)
}

c, err := cid.Parse(bs)
if err != nil {
return cid.Undef, false, xerrors.Errorf("error parsing migration result: %w", err)
}

return c, true, nil
}

func (m *resultCache) Store(ctx context.Context, root cid.Cid, resultCid cid.Cid) error {
kStr := fmt.Sprintf("%s-%s", m.keyPrefix, root)
k := dstore.NewKey(kStr)
if err := m.ds.Put(ctx, k, resultCid.Bytes()); err != nil {
return err
}

return nil
}

type Executor interface {
Expand Down Expand Up @@ -103,7 +147,7 @@ type treeCache struct {
tree *state.StateTree
}

func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, beacon beacon.Schedule) (*StateManager, error) {
func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, beacon beacon.Schedule, metadataDs dstore.Batching) (*StateManager, error) {
// If we have upgrades, make sure they're in-order and make sense.
if err := us.Validate(); err != nil {
return nil, err
Expand All @@ -122,12 +166,18 @@ func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder,
upgrade: upgrade.Migration,
preMigrations: upgrade.PreMigrations,
cache: nv16.NewMemMigrationCache(),
resultCache: &resultCache{
keyPrefix: fmt.Sprintf("nv%d-%d", upgrade.Network, upgrade.Height),
ds: metadataDs,
},
}

stateMigrations[upgrade.Height] = migration
}
if upgrade.Expensive {
expensiveUpgrades[upgrade.Height] = struct{}{}
}

networkVersions = append(networkVersions, versionSpec{
networkVersion: lastVersion,
atOrBelow: upgrade.Height,
Expand Down Expand Up @@ -155,8 +205,8 @@ func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder,
}, nil
}

func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, b beacon.Schedule, em ExecMonitor) (*StateManager, error) {
sm, err := NewStateManager(cs, exec, sys, us, b)
func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, b beacon.Schedule, em ExecMonitor, metadataDs dstore.Batching) (*StateManager, error) {
sm, err := NewStateManager(cs, exec, sys, us, b, metadataDs)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions chain/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ func TestChainExportImportFull(t *testing.T) {
}

nbs := blockstore.NewMemorySync()
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil)
ds := datastore.NewMapDatastore()
cs := store.NewChainStore(nbs, nbs, ds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

root, err := cs.Import(context.TODO(), buf)
Expand All @@ -213,7 +214,7 @@ func TestChainExportImportFull(t *testing.T) {
t.Fatal("imported chain differed from exported chain")
}

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), nil, filcns.DefaultUpgradeSchedule(), cg.BeaconSchedule())
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), nil, filcns.DefaultUpgradeSchedule(), cg.BeaconSchedule(), ds)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-bench/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ var importBenchCmd = &cli.Command{
defer cs.Close() //nolint:errcheck

// TODO: We need to supply the actual beacon after v14
stm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(verifier), filcns.DefaultUpgradeSchedule(), nil)
stm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(verifier), filcns.DefaultUpgradeSchedule(), nil, metadataDs)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ var chainBalanceStateCmd = &cli.Command{
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}
Expand Down Expand Up @@ -739,7 +739,7 @@ var chainPledgeCmd = &cli.Command{
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/gas-estimation.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ var gasTraceCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd, mds)
if err != nil {
return err
}
Expand Down Expand Up @@ -214,7 +214,7 @@ var replayOfflineCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd, mds)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/invariants.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var invariantsCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/lotus-shed/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/urfave/cli/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -121,7 +122,8 @@ var migrationsCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
// Note: we use a map datastore for the metadata to avoid writing / using cached migration results in the metadata store
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, datastore.NewMapDatastore())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/state-stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ to reduce the number of decode operations performed by caching the decoded objec
}

tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc)
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit b8901ef

Please sign in to comment.