Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain/migrations refactor: optionally set max migration workers #9784

Merged
merged 2 commits into from
Dec 5, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions chain/consensus/filcns/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
_ "embed"
"fmt"
"os"
"runtime"
"strconv"
"time"

"github.com/docker/go-units"
Expand Down Expand Up @@ -53,6 +55,29 @@ import (
//go:embed FVMLiftoff.txt
var fvmLiftoffBanner string

var (
MigrationMaxWorkerCount int
EnvMigrationMaxWorkerCount = "LOTUS_MIGRATION_MAX_WORKER_COUNT"
)

func init() {
// the default calculation used for migration worker count
MigrationMaxWorkerCount = runtime.NumCPU()
// check if an alternative value was request by environment
if mwcs := os.Getenv(EnvMigrationMaxWorkerCount); mwcs != "" {
mwc, err := strconv.ParseInt(mwcs, 10, 64)
frrist marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Warnf("invalid value for %s (%s) defaulting to %d: %s", EnvMigrationMaxWorkerCount, mwcs, MigrationMaxWorkerCount, err)
return
}
// use value from environment
log.Infof("migration worker cound set from %s (%d)", EnvMigrationMaxWorkerCount, mwc)
MigrationMaxWorkerCount = int(mwc)
Fixed Show fixed Hide fixed
return
}
log.Infof("migration worker count: %d", MigrationMaxWorkerCount)
}

func DefaultUpgradeSchedule() stmgr.UpgradeSchedule {
var us stmgr.UpgradeSchedule

Expand Down Expand Up @@ -891,7 +916,7 @@ func UpgradeCalico(ctx context.Context, sm *stmgr.StateManager, _ stmgr.Migratio

func UpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand Down Expand Up @@ -929,7 +954,7 @@ func UpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi

func PreUpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -993,7 +1018,7 @@ func upgradeActorsV3Common(

func UpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1015,7 +1040,7 @@ func UpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi

func PreUpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1079,7 +1104,7 @@ func upgradeActorsV4Common(

func UpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1101,7 +1126,7 @@ func UpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi

func PreUpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1165,7 +1190,7 @@ func upgradeActorsV5Common(

func UpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1187,7 +1212,7 @@ func UpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi

func PreUpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1251,7 +1276,7 @@ func upgradeActorsV6Common(

func UpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1273,7 +1298,7 @@ func UpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi

func PreUpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1344,7 +1369,7 @@ func upgradeActorsV7Common(

func UpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1368,7 +1393,7 @@ func UpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi

func PreUpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1451,7 +1476,7 @@ func upgradeActorsV8Common(
func UpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor,
root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1474,7 +1499,7 @@ func UpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid,
epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down