Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stmgr: cache migrated stateroots #10282

Merged
merged 2 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/gen/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS
//return nil, xerrors.Errorf("creating drand beacon: %w", err)
//}

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), sys, us, beac)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), sys, us, beac, ds)
if err != nil {
return nil, xerrors.Errorf("initing stmgr: %w", err)
}
Expand Down
14 changes: 13 additions & 1 deletion chain/stmgr/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,16 @@ func (us UpgradeSchedule) GetNtwkVersion(e abi.ChainEpoch) (network.Version, err

func (sm *StateManager) HandleStateForks(ctx context.Context, root cid.Cid, height abi.ChainEpoch, cb ExecMonitor, ts *types.TipSet) (cid.Cid, error) {
retCid := root
var err error
u := sm.stateMigrations[height]
if u != nil && u.upgrade != nil {
migCid, ok, err := u.resultCache.Result(ctx, root)
if err == nil && ok {
log.Warnw("CACHED migration", "height", height, "from", root, "to", migCid)
travisperson marked this conversation as resolved.
Show resolved Hide resolved
return migCid, nil
} else if err != nil {
log.Errorw("failed to lookup previous migration result", "err", err)
}

startTime := time.Now()
log.Warnw("STARTING migration", "height", height, "from", root)
// Yes, we clone the cache, even for the final upgrade epoch. Why? Reverts. We may
Expand All @@ -197,6 +204,11 @@ func (sm *StateManager) HandleStateForks(ctx context.Context, root cid.Cid, heig
"to", retCid,
"duration", time.Since(startTime),
)

// Only set if migration ran, we do not want a root => root mapping
if err := u.resultCache.Store(ctx, root, retCid); err != nil {
log.Errorw("failed to store migration result", "err", err)
}
}

return retCid, nil
Expand Down
108 changes: 105 additions & 3 deletions chain/stmgr/forks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
ipldcbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/stmgr"
. "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
Expand Down Expand Up @@ -166,7 +168,7 @@ func TestForkHeightTriggers(t *testing.T) {
}

return st.Flush(ctx)
}}}, cg.BeaconSchedule())
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -284,7 +286,7 @@ func testForkRefuseCall(t *testing.T, nullsBefore, nullsAfter int) {
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
migrationCount++
return root, nil
}}}, cg.BeaconSchedule())
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -502,7 +504,7 @@ func TestForkPreMigration(t *testing.T) {
return nil
},
}}},
}, cg.BeaconSchedule())
}, cg.BeaconSchedule(), datastore.NewMapDatastore())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -576,6 +578,7 @@ func TestDisablePreMigration(t *testing.T) {
}}},
},
cg.BeaconSchedule(),
datastore.NewMapDatastore(),
)
require.NoError(t, err)
require.NoError(t, sm.Start(context.Background()))
Expand Down Expand Up @@ -603,3 +606,102 @@ func TestDisablePreMigration(t *testing.T) {

require.Equal(t, 1, len(counter))
}

func TestMigrtionCache(t *testing.T) {
logging.SetAllLoggers(logging.LevelInfo)

cg, err := gen.NewGenerator()
require.NoError(t, err)

counter := make(chan struct{}, 10)
metadataDs := datastore.NewMapDatastore()

sm, err := NewStateManager(
cg.ChainStore(),
consensus.NewTipSetExecutor(filcns.RewardFunc),
cg.StateManager().VMSys(),
UpgradeSchedule{{
Network: network.Version1,
Height: testForkHeight,
Migration: func(_ context.Context, _ *StateManager, _ MigrationCache, _ ExecMonitor,
root cid.Cid, _ abi.ChainEpoch, _ *types.TipSet) (cid.Cid, error) {

counter <- struct{}{}

return root, nil
}},
},
cg.BeaconSchedule(),
metadataDs,
)
require.NoError(t, err)
require.NoError(t, sm.Start(context.Background()))
defer func() {
require.NoError(t, sm.Stop(context.Background()))
}()

inv := consensus.NewActorRegistry()
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
inv.Register(actorstypes.Version0, nil, registry)

sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
nvm, err := vm.NewLegacyVM(ctx, vmopt)
require.NoError(t, err)
nvm.SetInvoker(inv)
return nvm, nil
})

cg.SetStateManager(sm)

for i := 0; i < 50; i++ {
_, err := cg.NextTipSet()
require.NoError(t, err)
}

ts, err := cg.ChainStore().GetTipsetByHeight(context.Background(), testForkHeight, nil, false)
require.NoError(t, err)

root, _, err := stmgr.ComputeState(context.Background(), sm, testForkHeight+1, []*types.Message{}, ts)
require.NoError(t, err)
t.Log(root)

require.Equal(t, 1, len(counter))

{
sm, err := NewStateManager(
cg.ChainStore(),
consensus.NewTipSetExecutor(filcns.RewardFunc),
cg.StateManager().VMSys(),
UpgradeSchedule{{
Network: network.Version1,
Height: testForkHeight,
Migration: func(_ context.Context, _ *StateManager, _ MigrationCache, _ ExecMonitor,
root cid.Cid, _ abi.ChainEpoch, _ *types.TipSet) (cid.Cid, error) {

counter <- struct{}{}

return root, nil
}},
},
cg.BeaconSchedule(),
metadataDs,
)
require.NoError(t, err)
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
nvm, err := vm.NewLegacyVM(ctx, vmopt)
require.NoError(t, err)
nvm.SetInvoker(inv)
return nvm, nil
})

ctx := context.Background()

base, _, err := sm.ExecutionTrace(ctx, ts)
require.NoError(t, err)
_, err = sm.HandleStateForks(context.Background(), base, ts.Height(), nil, ts)
require.NoError(t, err)

// Should not have increased as we should be using the cached results in the metadataDs
require.Equal(t, 1, len(counter))
}
}
56 changes: 53 additions & 3 deletions chain/stmgr/stmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package stmgr

import (
"context"
"fmt"
"sync"

"github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -54,6 +56,48 @@ type migration struct {
upgrade MigrationFunc
preMigrations []PreMigration
cache *nv16.MemMigrationCache
resultCache *resultCache
}

type resultCache struct {
travisperson marked this conversation as resolved.
Show resolved Hide resolved
ds dstore.Batching
keyPrefix string
}

func (m *resultCache) Result(ctx context.Context, root cid.Cid) (cid.Cid, bool, error) {
travisperson marked this conversation as resolved.
Show resolved Hide resolved
kStr := fmt.Sprintf("%s-%s", m.keyPrefix, root)
k := dstore.NewKey(kStr)
travisperson marked this conversation as resolved.
Show resolved Hide resolved

found, err := m.ds.Has(ctx, k)
if err != nil {
return cid.Undef, false, xerrors.Errorf("error looking up migration result: %w", err)
}

if !found {
return cid.Undef, false, nil
}

bs, err := m.ds.Get(ctx, k)
if err != nil {
return cid.Undef, false, xerrors.Errorf("error loading migration result: %w", err)
}
travisperson marked this conversation as resolved.
Show resolved Hide resolved

c, err := cid.Parse(bs)
if err != nil {
return cid.Undef, false, xerrors.Errorf("error parsing migration result: %w", err)
}

return c, true, nil
}

func (m *resultCache) Store(ctx context.Context, root cid.Cid, resultCid cid.Cid) error {
kStr := fmt.Sprintf("%s-%s", m.keyPrefix, root)
travisperson marked this conversation as resolved.
Show resolved Hide resolved
k := dstore.NewKey(kStr)
travisperson marked this conversation as resolved.
Show resolved Hide resolved
if err := m.ds.Put(ctx, k, resultCid.Bytes()); err != nil {
return err
}

return nil
}

type Executor interface {
Expand Down Expand Up @@ -103,7 +147,7 @@ type treeCache struct {
tree *state.StateTree
}

func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, beacon beacon.Schedule) (*StateManager, error) {
func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, beacon beacon.Schedule, metadataDs dstore.Batching) (*StateManager, error) {
// If we have upgrades, make sure they're in-order and make sense.
if err := us.Validate(); err != nil {
return nil, err
Expand All @@ -122,12 +166,18 @@ func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder,
upgrade: upgrade.Migration,
preMigrations: upgrade.PreMigrations,
cache: nv16.NewMemMigrationCache(),
resultCache: &resultCache{
keyPrefix: fmt.Sprintf("nv%d-%d", upgrade.Network, upgrade.Height),
ds: metadataDs,
},
}

stateMigrations[upgrade.Height] = migration
}
if upgrade.Expensive {
expensiveUpgrades[upgrade.Height] = struct{}{}
}

networkVersions = append(networkVersions, versionSpec{
networkVersion: lastVersion,
atOrBelow: upgrade.Height,
Expand Down Expand Up @@ -155,8 +205,8 @@ func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder,
}, nil
}

func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, b beacon.Schedule, em ExecMonitor) (*StateManager, error) {
sm, err := NewStateManager(cs, exec, sys, us, b)
func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, b beacon.Schedule, em ExecMonitor, metadataDs dstore.Batching) (*StateManager, error) {
sm, err := NewStateManager(cs, exec, sys, us, b, metadataDs)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions chain/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ func TestChainExportImportFull(t *testing.T) {
}

nbs := blockstore.NewMemorySync()
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil)
ds := datastore.NewMapDatastore()
cs := store.NewChainStore(nbs, nbs, ds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

root, err := cs.Import(context.TODO(), buf)
Expand All @@ -213,7 +214,7 @@ func TestChainExportImportFull(t *testing.T) {
t.Fatal("imported chain differed from exported chain")
}

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), nil, filcns.DefaultUpgradeSchedule(), cg.BeaconSchedule())
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), nil, filcns.DefaultUpgradeSchedule(), cg.BeaconSchedule(), ds)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-bench/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ var importBenchCmd = &cli.Command{
defer cs.Close() //nolint:errcheck

// TODO: We need to supply the actual beacon after v14
stm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(verifier), filcns.DefaultUpgradeSchedule(), nil)
stm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(verifier), filcns.DefaultUpgradeSchedule(), nil, metadataDs)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ var chainBalanceStateCmd = &cli.Command{
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}
Expand Down Expand Up @@ -737,7 +737,7 @@ var chainPledgeCmd = &cli.Command{
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/gas-estimation.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ var gasTraceCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd, mds)
if err != nil {
return err
}
Expand Down Expand Up @@ -212,7 +212,7 @@ var replayOfflineCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd, mds)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/invariants.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ var invariantsCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/lotus-shed/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/urfave/cli/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -120,7 +121,8 @@ var migrationsCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
// Note: we use a map datastore for the metadata to avoid writing / using cached migration results in the metadata store
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, datastore.NewMapDatastore())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/state-stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ to reduce the number of decode operations performed by caching the decoded objec
}

tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc)
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}
Expand Down
Loading