Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle deal id changes in OnDealSectorCommitted #4730

Merged
merged 7 commits into from
Nov 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat(markets): handle deal ID changes
make OnDealSectorCommitted handle changes to deal ids
  • Loading branch information
hannahhoward committed Nov 10, 2020
commit d2acc78787d744ae59bf7e211f52022f0f669b73
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v1.0.0
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
github.com/filecoin-project/go-fil-markets v1.0.0
github.com/filecoin-project/go-fil-markets v0.9.2-0.20201103030854-3d9aa08b3c2f
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20201008195726-68c6a2704e49
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand Down Expand Up @@ -135,6 +135,7 @@ require (
go.uber.org/fx v1.9.0
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.15.0
golang.org/x/net v0.0.0-20200707034311-ab3426394381
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v0.9.2-0.20201103030854-3d9aa08b3c2f h1:SAyL9YllpB/2gyclDE0AXQenEQc4O2g2IP+DDF7L0Vg=
github.com/filecoin-project/go-fil-markets v0.9.2-0.20201103030854-3d9aa08b3c2f/go.mod h1:h+bJ/IUnYjnW5HMKyt9JQSnhslqetkpuzwwugc3K8vM=
github.com/filecoin-project/go-fil-markets v1.0.0 h1:np9+tlnWXh9xYG4oZfha6HZFLYOaAZoMGR3V4w6DM48=
github.com/filecoin-project/go-fil-markets v1.0.0/go.mod h1:lXExJyYHwpMMddCqhEdNrc7euYJKNkp04K76NZqJLGg=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
Expand Down
81 changes: 46 additions & 35 deletions markets/storageadapter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,14 @@ func (c *ClientNodeAdapter) DealProviderCollateralBounds(ctx context.Context, si
return big.Mul(bounds.Min, big.NewInt(clientOverestimation)), bounds.Max, nil
}

func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId abi.DealID, cb storagemarket.DealSectorCommittedCallback) error {
func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err := c.StateMarketStorageDeal(ctx, dealId, ts.Key())

newDealID, sd, err := GetCurrentDealInfo(ctx, ts, c, dealID, publishCid)
if err != nil {
// TODO: This may be fine for some errors
return false, false, xerrors.Errorf("client: failed to look up deal on chain: %w", err)
return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
dealID = newDealID

if sd.State.SectorStartEpoch > 0 {
cb(nil)
Expand All @@ -230,32 +230,58 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
return false, true, nil
}

var sectorNumber abi.SectorNumber
var sectorFound bool

called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) {
defer func() {
if err != nil {
cb(xerrors.Errorf("handling applied event: %w", err))
}
}()
switch msg.Method {
case miner.Methods.PreCommitSector:
dealID, _, err = GetCurrentDealInfo(ctx, ts, c, dealID, publishCid)
if err != nil {
return false, err
}

if msg == nil {
log.Error("timed out waiting for deal activation... what now?")
return false, nil
}
var params miner.SectorPreCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
}

sd, err := c.StateMarketStorageDeal(ctx, dealId, ts.Key())
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
for _, did := range params.DealIDs {
if did == dealID {
sectorNumber = params.SectorNumber
sectorFound = true
return true, nil
}
}
return true, nil
case miner.Methods.ProveCommitSector:
if msg == nil {
log.Error("timed out waiting for deal activation... what now?")
return false, nil
}

if sd.State.SectorStartEpoch < 1 {
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealId, ts.ParentState(), ts.Height())
}
sd, err := c.StateMarketStorageDeal(ctx, dealID, ts.Key())
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}

if sd.State.SectorStartEpoch < 1 {
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealID, ts.ParentState(), ts.Height())
}

log.Infof("Storage deal %d activated at epoch %d", dealId, sd.State.SectorStartEpoch)
log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch)

cb(nil)
cb(nil)

return false, nil
return false, nil
default:
return false, nil
}
}

revert := func(ctx context.Context, ts *types.TipSet) error {
Expand All @@ -264,29 +290,14 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
return nil
}

var sectorNumber abi.SectorNumber
var sectorFound bool
matchEvent := func(msg *types.Message) (matched bool, err error) {
if msg.To != provider {
return false, nil
}

switch msg.Method {
case miner2.MethodsMiner.PreCommitSector:
var params miner.SectorPreCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
}

for _, did := range params.DealIDs {
if did == dealId {
sectorNumber = params.SectorNumber
sectorFound = true
return false, nil
}
}

return false, nil
case miner.Methods.PreCommitSector:
return !sectorFound, nil
case miner2.MethodsMiner.ProveCommitSector:
var params miner.ProveCommitSectorParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
Expand Down
59 changes: 59 additions & 0 deletions markets/storageadapter/getcurrentdealinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package storageadapter

import (
"bytes"
"context"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
)

type getCurrentDealInfoAPI interface {
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error)
}

// GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed
func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, error) {
marketDeal, dealErr := api.StateMarketStorageDeal(ctx, dealID, ts.Key())
if dealErr == nil {
return dealID, marketDeal, nil
}
if publishCid == nil {
return dealID, nil, dealErr
}
// attempt deal id correction
lookup, err := api.StateSearchMsg(ctx, *publishCid)
magik6k marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return dealID, nil, err
}

if lookup.Receipt.ExitCode != exitcode.Ok {
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode)
}

var retval market.PublishStorageDealsReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
return dealID, nil, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err)
}

if len(retval.IDs) != 1 {
// market currently only ever sends messages with 1 deal
return dealID, nil, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal")
}

if retval.IDs[0] == dealID {
// DealID did not change, so we are stuck with the original lookup error
return dealID, nil, dealErr
}

dealID = retval.IDs[0]
marketDeal, err = api.StateMarketStorageDeal(ctx, dealID, ts.Key())

return dealID, marketDeal, err
}
185 changes: 185 additions & 0 deletions markets/storageadapter/getcurrentdealinfo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package storageadapter

import (
"bytes"
"errors"
"math/rand"
"testing"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
test "github.com/filecoin-project/lotus/chain/events/state/mock"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"golang.org/x/xerrors"
)

var notFoundError = errors.New("Could not find")

func TestGetCurrentDealInfo(t *testing.T) {
ctx := context.Background()
dummyCid, _ := cid.Parse("bafkqaaa")
startDealID := abi.DealID(rand.Uint64())
newDealID := abi.DealID(rand.Uint64())
twoValuesReturn := makePublishDealsReturnBytes(t, []abi.DealID{abi.DealID(rand.Uint64()), abi.DealID(rand.Uint64())})
sameValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{startDealID})
newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID})
successDeal := &api.MarketDeal{
State: market.DealState{
SectorStartEpoch: 1,
LastUpdatedEpoch: 2,
},
}
testCases := map[string]struct {
searchMessageLookup *api.MsgLookup
searchMessageErr error
marketDeals map[abi.DealID]*api.MarketDeal
publishCid *cid.Cid
expectedDealID abi.DealID
expectedMarketDeal *api.MarketDeal
expectedError error
}{
"deal lookup succeeds": {
marketDeals: map[abi.DealID]*api.MarketDeal{
startDealID: successDeal,
},
expectedDealID: startDealID,
expectedMarketDeal: successDeal,
},
"publish CID = nil": {
expectedDealID: startDealID,
expectedError: notFoundError,
},
"search message fails": {
publishCid: &dummyCid,
searchMessageErr: errors.New("something went wrong"),
expectedDealID: startDealID,
expectedError: errors.New("something went wrong"),
},
"return code not ok": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.ErrIllegalState,
},
},
expectedDealID: startDealID,
expectedError: xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", dummyCid, exitcode.ErrIllegalState),
},
"unable to unmarshal params": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: []byte("applesauce"),
},
},
expectedDealID: startDealID,
expectedError: xerrors.Errorf("looking for publish deal message: unmarshaling message return: cbor input should be of type array"),
},
"more than one returned id": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: twoValuesReturn,
},
},
expectedDealID: startDealID,
expectedError: xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal"),
},
"deal ids still match": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: sameValueReturn,
},
},
expectedDealID: startDealID,
expectedError: notFoundError,
},
"new deal id success": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: newValueReturn,
},
},
marketDeals: map[abi.DealID]*api.MarketDeal{
newDealID: successDeal,
},
expectedDealID: newDealID,
expectedMarketDeal: successDeal,
},
"new deal id failure": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: newValueReturn,
},
},
expectedDealID: newDealID,
expectedError: notFoundError,
},
}
for testCase, data := range testCases {
t.Run(testCase, func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
ts, err := test.MockTipset(address.TestAddress, rand.Uint64())
require.NoError(t, err)
api := &mockGetCurrentDealInfoAPI{
SearchMessageLookup: data.searchMessageLookup,
SearchMessageErr: data.searchMessageErr,
MarketDeals: data.marketDeals,
}

dealID, marketDeal, err := GetCurrentDealInfo(ctx, ts, api, startDealID, data.publishCid)
require.Equal(t, data.expectedDealID, dealID)
require.Equal(t, data.expectedMarketDeal, marketDeal)
if data.expectedError == nil {
require.NoError(t, err)
} else {
require.EqualError(t, err, data.expectedError.Error())
}
})
}
}

type mockGetCurrentDealInfoAPI struct {
SearchMessageLookup *api.MsgLookup
SearchMessageErr error

MarketDeals map[abi.DealID]*api.MarketDeal
}

func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) {
deal, ok := mapi.MarketDeals[dealID]
if !ok {
return nil, notFoundError
}
return deal, nil
}

func (mapi *mockGetCurrentDealInfoAPI) StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) {
return mapi.SearchMessageLookup, mapi.SearchMessageErr
}

func makePublishDealsReturnBytes(t *testing.T, dealIDs []abi.DealID) []byte {
buf := new(bytes.Buffer)
dealsReturn := market.PublishStorageDealsReturn{
IDs: dealIDs,
}
err := dealsReturn.MarshalCBOR(buf)
require.NoError(t, err)
return buf.Bytes()
}
Loading