Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: recovery: Config for maximum partition count per message #8986

Merged
merged 1 commit into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,18 @@
# Setting this value above the network limit has no effect
#
# type: int
# env var: LOTUS_PROVING_MAXPARTITIONSPERMESSAGE
#MaxPartitionsPerMessage = 0
# env var: LOTUS_PROVING_MAXPARTITIONSPERPOSTMESSAGE
#MaxPartitionsPerPoStMessage = 0

# In some cases when submitting DeclareFaultsRecovered messages,
# there may be too many recoveries to fit in a BlockGasLimit.
# In those cases it may be necessary to set this value to something low (eg 1);
# Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
# resulting in more total gas use (but each message will have lower gas limit)
#
# type: int
# env var: LOTUS_PROVING_MAXPARTITIONSPERRECOVERYMESSAGE
#MaxPartitionsPerRecoveryMessage = 0


[Sealing]
Expand Down
12 changes: 11 additions & 1 deletion node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,16 @@ type ProvingConfig struct {
// to prove each deadline, resulting in more total gas use (but each message will have lower gas limit)
//
// Setting this value above the network limit has no effect
MaxPartitionsPerMessage int
MaxPartitionsPerPoStMessage int

// Maximum number of partitions to declare in a single DeclareFaultsRecovered message. 0 = no limit.

// In some cases when submitting DeclareFaultsRecovered messages,
// there may be too many recoveries to fit in a BlockGasLimit.
// In those cases it may be necessary to set this value to something low (eg 1);
// Note that setting this value lower may result in less efficient gas use - more messages will be sent than needed,
// resulting in more total gas use (but each message will have lower gas limit)
MaxPartitionsPerRecoveryMessage int
}

type SealingConfig struct {
Expand Down
64 changes: 64 additions & 0 deletions storage/paths/gomock_reflect_3544667724/prog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"encoding/gob"
"flag"
"fmt"
"os"
"path"
"reflect"

"github.com/golang/mock/mockgen/model"

pkg_ "github.com/filecoin-project/lotus/storage/paths"
)

var output = flag.String("output", "", "The output file name, or empty to use stdout.")

func main() {
flag.Parse()

its := []struct {
sym string
typ reflect.Type
}{

{"Store", reflect.TypeOf((*pkg_.Store)(nil)).Elem()},
}
pkg := &model.Package{
// NOTE: This behaves contrary to documented behaviour if the
// package name is not the final component of the import path.
// The reflect package doesn't expose the package name, though.
Name: path.Base("github.com/filecoin-project/lotus/storage/paths"),
}

for _, it := range its {
intf, err := model.InterfaceFromInterfaceType(it.typ)
if err != nil {
fmt.Fprintf(os.Stderr, "Reflection: %v\n", err)
os.Exit(1)
}
intf.Name = it.sym
pkg.Interfaces = append(pkg.Interfaces, intf)
}

outfile := os.Stdout
if len(*output) != 0 {
var err error
outfile, err = os.Create(*output)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to open output file %q", *output)
}
defer func() {
if err := outfile.Close(); err != nil {
fmt.Fprintf(os.Stderr, "failed to close output file %q", *output)
os.Exit(1)
}
}()
}

if err := gob.NewEncoder(outfile).Encode(pkg); err != nil {
fmt.Fprintf(os.Stderr, "gob encode: %v\n", err)
os.Exit(1)
}
}
6 changes: 3 additions & 3 deletions storage/wdpost/wdpost_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,9 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv net
}

// respect user config if set
if s.maxPartitionsPerMessage > 0 {
if partitionsPerMsg > s.maxPartitionsPerMessage {
partitionsPerMsg = s.maxPartitionsPerMessage
if s.maxPartitionsPerPostMessage > 0 {
if partitionsPerMsg > s.maxPartitionsPerPostMessage {
partitionsPerMsg = s.maxPartitionsPerPostMessage
}
}

Expand Down
130 changes: 69 additions & 61 deletions storage/wdpost/wdpost_run_faults.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ import (
// TODO: the waiting should happen in the background. Right now this
// is blocking/delaying the actual generation and submission of WindowPoSts in
// this deadline!
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) {
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([][]miner.RecoveryDeclaration, []*types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries")
defer span.End()

faulty := uint64(0)
params := &miner.DeclareFaultsRecoveredParams{
Recoveries: []miner.RecoveryDeclaration{},
}

var batchedRecoveryDecls [][]miner.RecoveryDeclaration
batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{})
totalRecoveries := 0

for partIdx, partition := range partitions {
unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors)
Expand Down Expand Up @@ -77,55 +78,72 @@ func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint6
continue
}

params.Recoveries = append(params.Recoveries, miner.RecoveryDeclaration{
// respect user config if set
if s.maxPartitionsPerRecoveryMessage > 0 &&
len(batchedRecoveryDecls[len(batchedRecoveryDecls)-1]) >= s.maxPartitionsPerRecoveryMessage {
batchedRecoveryDecls = append(batchedRecoveryDecls, []miner.RecoveryDeclaration{})
}

batchedRecoveryDecls[len(batchedRecoveryDecls)-1] = append(batchedRecoveryDecls[len(batchedRecoveryDecls)-1], miner.RecoveryDeclaration{
Deadline: dlIdx,
Partition: uint64(partIdx),
Sectors: recovered,
})

totalRecoveries++
}

recoveries := params.Recoveries
if len(recoveries) == 0 {
if totalRecoveries == 0 {
if faulty != 0 {
log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty)
}

return recoveries, nil, nil
return nil, nil, nil
}

enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return recoveries, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
}
var msgs []*types.SignedMessage
for _, recovery := range batchedRecoveryDecls {
params := &miner.DeclareFaultsRecoveredParams{
Recoveries: recovery,
}

msg := &types.Message{
To: s.actor,
Method: builtin.MethodsMiner.DeclareFaultsRecovered,
Params: enc,
Value: types.NewInt(0),
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.prepareMessage(ctx, msg, spec); err != nil {
return recoveries, nil, err
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return nil, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
}

sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
if err != nil {
return recoveries, sm, xerrors.Errorf("pushing message to mpool: %w", err)
}
msg := &types.Message{
To: s.actor,
Method: builtin.MethodsMiner.DeclareFaultsRecovered,
Params: enc,
Value: types.NewInt(0),
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.prepareMessage(ctx, msg, spec); err != nil {
return nil, nil, err
}

log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
if err != nil {
return nil, nil, xerrors.Errorf("pushing message to mpool: %w", err)
}

rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
return recoveries, sm, xerrors.Errorf("declare faults recovered wait error: %w", err)
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
msgs = append(msgs, sm)
}

if rec.Receipt.ExitCode != 0 {
return recoveries, sm, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
for _, msg := range msgs {
rec, err := s.api.StateWaitMsg(context.TODO(), msg.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait error: %w", err)
}

if rec.Receipt.ExitCode != 0 {
return batchedRecoveryDecls, msgs, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
}
}

return recoveries, sm, nil
return batchedRecoveryDecls, msgs, nil
}

// declareFaults identifies the sectors on the specified proving deadline that
Expand Down Expand Up @@ -241,9 +259,8 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
}

var (
sigmsg *types.SignedMessage
recoveries []miner.RecoveryDeclaration
faults []miner.FaultDeclaration
sigmsgs []*types.SignedMessage
recoveries [][]miner.RecoveryDeclaration

// optionalCid returns the CID of the message, or cid.Undef is the
// message is nil. We don't need the argument (could capture the
Expand All @@ -256,36 +273,27 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
}
)

if recoveries, sigmsg, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
if recoveries, sigmsgs, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
// TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse
log.Errorf("checking sector recoveries: %v", err)
}

s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
j := WdPoStRecoveriesProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: recoveries,
MessageCID: optionalCid(sigmsg),
// should always be true, skip journaling if not for some reason
if len(recoveries) == len(sigmsgs) {
for i, recovery := range recoveries {
// clone for function literal
recovery := recovery
msgCID := optionalCid(sigmsgs[i])
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
j := WdPoStRecoveriesProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: recovery,
MessageCID: msgCID,
}
j.Error = err
return j
})
}
j.Error = err
return j
})

if ts.Height() > build.UpgradeIgnitionHeight {
return // FORK: declaring faults after ignition upgrade makes no sense
}

if faults, sigmsg, err = s.declareFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
// TODO: This is also potentially really bad, but we try to post anyways
log.Errorf("checking sector faults: %v", err)
}

s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
return WdPoStFaultsProcessedEvt{
evtCommon: s.getEvtCommon(err),
Declarations: faults,
MessageCID: optionalCid(sigmsg),
}
})
}()
}
Loading