Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error out deals that are not activated by proposed deal start epoch #5061

Merged
merged 5 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/test/ccupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func testCCUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, upgradeH
t.Fatal(err)
}

MakeDeal(t, ctx, 6, client, miner, false, false)
MakeDeal(t, ctx, 6, client, miner, false, false, 0)

// Validate upgrade

Expand Down
29 changes: 16 additions & 13 deletions api/test/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"testing"
"time"

"github.com/filecoin-project/go-state-types/abi"

"github.com/stretchr/testify/require"

"github.com/ipfs/go-cid"
Expand All @@ -31,7 +33,7 @@ import (
ipld "github.com/ipfs/go-ipld-format"
)

func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport, fastRet bool) {
func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport, fastRet bool, startEpoch abi.ChainEpoch) {

ctx := context.Background()
n, sn := b(t, OneFull, OneMiner)
Expand Down Expand Up @@ -60,14 +62,14 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport
}
}()

MakeDeal(t, ctx, 6, client, miner, carExport, fastRet)
MakeDeal(t, ctx, 6, client, miner, carExport, fastRet, startEpoch)

atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
}

func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {

ctx := context.Background()
n, sn := b(t, OneFull, OneMiner)
Expand Down Expand Up @@ -97,15 +99,15 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
}
}()

MakeDeal(t, ctx, 6, client, miner, false, false)
MakeDeal(t, ctx, 7, client, miner, false, false)
MakeDeal(t, ctx, 6, client, miner, false, false, startEpoch)
MakeDeal(t, ctx, 7, client, miner, false, false, startEpoch)

atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
}

func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool) {
func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool, startEpoch abi.ChainEpoch) {
res, data, err := CreateClientFile(ctx, client, rseed)
if err != nil {
t.Fatal(err)
Expand All @@ -114,7 +116,7 @@ func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode,
fcid := res.Root
fmt.Println("FILE CID: ", fcid)

deal := startDeal(t, ctx, miner, client, fcid, fastRet)
deal := startDeal(t, ctx, miner, client, fcid, fastRet, startEpoch)

// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
Expand Down Expand Up @@ -149,7 +151,7 @@ func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api
return res, data, nil
}

func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {

ctx := context.Background()
n, sn := b(t, OneFull, OneMiner)
Expand Down Expand Up @@ -189,7 +191,7 @@ func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Durati

fmt.Println("FILE CID: ", fcid)

deal := startDeal(t, ctx, miner, client, fcid, true)
deal := startDeal(t, ctx, miner, client, fcid, true, startEpoch)

waitDealPublished(t, ctx, miner, deal)
fmt.Println("deal published, retrieving")
Expand All @@ -203,7 +205,7 @@ func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Durati
<-done
}

func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) {
func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) {

ctx := context.Background()
n, sn := b(t, OneFull, OneMiner)
Expand Down Expand Up @@ -252,13 +254,13 @@ func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration
t.Fatal(err)
}

deal1 := startDeal(t, ctx, miner, client, fcid1, true)
deal1 := startDeal(t, ctx, miner, client, fcid1, true, 0)

// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal1, true)

deal2 := startDeal(t, ctx, miner, client, fcid2, true)
deal2 := startDeal(t, ctx, miner, client, fcid2, true, 0)

time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal2, false)
Expand All @@ -278,7 +280,7 @@ func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration
<-done
}

func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, fcid cid.Cid, fastRet bool) *cid.Cid {
func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid {
maddr, err := miner.ActorAddress(ctx)
if err != nil {
t.Fatal(err)
Expand All @@ -296,6 +298,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client
Wallet: addr,
Miner: maddr,
EpochPrice: types.NewInt(1000000),
DealStartEpoch: startEpoch,
MinBlocksDuration: uint64(build.MinDealDuration),
FastRetrieval: fastRet,
})
Expand Down
2 changes: 1 addition & 1 deletion api/test/mining.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
}
}()

deal := startDeal(t, ctx, provider, client, fcid, false)
deal := startDeal(t, ctx, provider, client, fcid, false, 0)

// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
Expand Down
6 changes: 5 additions & 1 deletion chain/stmgr/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,11 @@ func GetStorageDeal(ctx context.Context, sm *StateManager, dealID abi.DealID, ts
if err != nil {
return nil, err
} else if !found {
return nil, xerrors.Errorf("deal %d not found", dealID)
return nil, xerrors.Errorf(
"deal %d not found "+
"- deal may not have completed sealing before deal proposal "+
"start epoch, or deal may have been slashed",
dealID)
}

states, err := state.States()
Expand Down
5 changes: 3 additions & 2 deletions cli/test/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode)
require.Regexp(t, regexp.MustCompile("Ask:"), out)

// Create a deal (non-interactive)
// client deal <cid> <miner addr> 1000000attofil <duration>
// client deal --start-epoch=<start epoch> <cid> <miner addr> 1000000attofil <duration>
res, _, err := test.CreateClientFile(ctx, clientNode, 1)
require.NoError(t, err)
startEpoch := fmt.Sprintf("--start-epoch=%d", 2<<12)
dataCid := res.Root
price := "1000000attofil"
duration := fmt.Sprintf("%d", build.MinDealDuration)
out = clientCLI.RunCmd("client", "deal", dataCid.String(), minerAddr.String(), price, duration)
out = clientCLI.RunCmd("client", "deal", startEpoch, dataCid.String(), minerAddr.String(), price, duration)
fmt.Println("client deal", out)

// Create a deal (interactive)
Expand Down
6 changes: 5 additions & 1 deletion cmd/lotus-gateway/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ func TestDealFlow(t *testing.T) {
nodes := startNodesWithFunds(ctx, t, blocktime, maxLookbackCap, maxStateWaitLookbackLimit)
defer nodes.closer()

test.MakeDeal(t, ctx, 6, nodes.lite, nodes.miner, false, false)
// For these tests where the block time is artificially short, just use
// a deal start epoch that is guaranteed to be far enough in the future
// so that the deal starts sealing in time
dealStartEpoch := abi.ChainEpoch(2 << 12)
test.MakeDeal(t, ctx, 6, nodes.lite, nodes.miner, false, false, dealStartEpoch)
}

func TestCLIDealFlow(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-storage-miner/allinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestMinerAllInfo(t *testing.T) {
return n, sn
}

test.TestDealFlow(t, bp, time.Second, false, false)
test.TestDealFlow(t, bp, time.Second, false, false, 0)

t.Run("post-info-all", run)
}
57 changes: 47 additions & 10 deletions markets/storageadapter/ondealsectorcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storageadapter
import (
"bytes"
"context"
"sync"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
Expand All @@ -20,7 +21,15 @@ type sectorCommittedEventsAPI interface {
Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
}

func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error {
func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error {
// Ensure callback is only called once
var once sync.Once
cb := func(sectorNumber abi.SectorNumber, isActive bool, err error) {
once.Do(func() {
callback(sectorNumber, isActive, err)
})
}

// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
Expand All @@ -47,6 +56,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return matched, nil
}

// The deal must be accepted by the deal proposal start epoch, so timeout
// if the chain reaches that epoch
timeoutEpoch := proposal.StartEpoch + 1

// Check if the message params included the deal ID we're looking for.
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) {
defer func() {
Expand All @@ -55,9 +68,11 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
}
}()

// Check if waiting for pre-commit timed out
// If the deal hasn't been activated by the proposed start epoch, the
// deal will timeout (when msg == nil it means the timeout epoch was reached)
if msg == nil {
return false, xerrors.Errorf("timed out waiting for deal %d pre-commit", dealID)
err = xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch)
return false, err
}

// Extract the message parameters
Expand Down Expand Up @@ -92,14 +107,22 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return nil
}

if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil {
if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
return xerrors.Errorf("failed to set up called handler: %w", err)
}

return nil
}

func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorCommittedCallback) error {
// Ensure callback is only called once
var once sync.Once
cb := func(err error) {
once.Do(func() {
callback(err)
})
}

// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
Expand Down Expand Up @@ -134,16 +157,22 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
return params.SectorNumber == sectorNumber, nil
}

// The deal must be accepted by the deal proposal start epoch, so timeout
// if the chain reaches that epoch
timeoutEpoch := proposal.StartEpoch + 1

called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) {
defer func() {
if err != nil {
cb(xerrors.Errorf("handling applied event: %w", err))
}
}()

// Check if waiting for prove-commit timed out
// If the deal hasn't been activated by the proposed start epoch, the
// deal will timeout (when msg == nil it means the timeout epoch was reached)
if msg == nil {
return false, xerrors.Errorf("timed out waiting for deal activation for deal %d", dealID)
err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch)
return false, err
}

// Get the deal info
Expand All @@ -170,7 +199,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
return nil
}

if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil {
if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler: %w", err)
}

Expand All @@ -185,6 +214,14 @@ func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts
}

// Sector with deal is already active
isActive := sd.State.SectorStartEpoch > 0
return isActive, nil
if sd.State.SectorStartEpoch > 0 {
return true, nil
}

// Sector was slashed
if sd.State.SlashEpoch > 0 {
return false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch)
}

return false, nil
}
Loading