Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions api/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
type Market interface {
// MethodGroup: Market

MarketDummyDeal(context.Context, smtypes.DealParams) (*ProviderDealRejectionInfo, error) //perm:admin
Deal(ctx context.Context, dealUuid uuid.UUID) (*smtypes.ProviderDealState, error) //perm:admin
IndexerAnnounceAllDeals(ctx context.Context) error //perm:admin
MarketDummyDeal(context.Context, smtypes.DealParams) (*ProviderDealRejectionInfo, error) //perm:admin
Deal(ctx context.Context, dealUuid uuid.UUID) (*smtypes.ProviderDealState, error) //perm:admin
IndexerAnnounceAllDeals(ctx context.Context) error //perm:admin
MakeOfflineDealWithData(dealUuid uuid.UUID, filePath string) (*ProviderDealRejectionInfo, error) //perm:admin
}

// ProviderDealRejectionInfo is the information sent by the Storage Provider
Expand Down
13 changes: 13 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions cmd/boost/deals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"fmt"

bcli "github.com/filecoin-project/boost/cli"
"github.com/google/uuid"
"github.com/urfave/cli/v2"
)

var dealCmd = &cli.Command{
Name: "deal",
Usage: "Manage Boost deals",
Subcommands: []*cli.Command{
makeOfflineDealWithData,
},
}

var makeOfflineDealWithData = &cli.Command{
Name: "offline-deal",
Usage: "Make offline deal with data",
Flags: []cli.Flag{&cli.StringFlag{
Name: "deal-uuid",
Usage: "uuid of the offline deal",
Required: true,
},

&cli.StringFlag{
Name: "filepath",
Usage: "path of the file containing the offline deal data",
Required: true,
},
},

Action: func(cctx *cli.Context) error {
napi, closer, err := bcli.GetBoostAPI(cctx)
if err != nil {
return err
}
defer closer()

filePath := cctx.String("filepath")
id := cctx.String("deal-uuid")

dealUuid, err := uuid.Parse(id)
if err != nil {
return fmt.Errorf("failed to parse deal uuid")
}
rej, err := napi.MakeOfflineDealWithData(dealUuid, filePath)
if err != nil {
return fmt.Errorf("failed to execute offline deal: %w", err)
}
if rej != nil && rej.Reason != "" {
return fmt.Errorf("offline deal %s rejected: %s", dealUuid, rej.Reason)
}
fmt.Println("\n offline deal is being executed")
return nil
},
}
1 change: 1 addition & 0 deletions cmd/boost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func main() {
dataTransfersCmd,
retrievalDealsCmd,
indexProvCmd,
dealCmd,
},
}
app.Setup()
Expand Down
1 change: 1 addition & 0 deletions db/create_main_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CREATE TABLE IF NOT EXISTS Deals (
DealProposalSignature BLOB,
PieceCID TEXT,
PieceSize INT,
IsOffline BOOL,
VerifiedDeal BOOL,
ClientAddress TEXT,
ProviderAddress TEXT,
Expand Down
1 change: 1 addition & 0 deletions db/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func newDealAccessor(db *sql.DB, deal *types.ProviderDealState) *dealAccessor {
"PieceCID": &cidFieldDef{f: &deal.ClientDealProposal.Proposal.PieceCID},
"PieceSize": &fieldDef{f: &deal.ClientDealProposal.Proposal.PieceSize},
"VerifiedDeal": &fieldDef{f: &deal.ClientDealProposal.Proposal.VerifiedDeal},
"IsOffline": &fieldDef{f: &deal.IsOffline},
"ClientAddress": &addrFieldDef{f: &deal.ClientDealProposal.Proposal.Client},
"ProviderAddress": &addrFieldDef{f: &deal.ClientDealProposal.Proposal.Provider},
"Label": &fieldDef{f: &deal.ClientDealProposal.Proposal.Label},
Expand Down
1 change: 1 addition & 0 deletions db/deals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,5 @@ func TestDealsDB(t *testing.T) {
deal.CreatedAt = time.Time{}
storedDeal.CreatedAt = time.Time{}
req.Equal(deal, *storedDeal)
req.True(deal.IsOffline)
}
1 change: 1 addition & 0 deletions db/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func GenerateDeals() ([]types.ProviderDealState, error) {
deal := types.ProviderDealState{
DealUuid: uuid.New(),
CreatedAt: time.Now(),
IsOffline: true,
ClientDealProposal: market.ClientDealProposal{
Proposal: market.DealProposal{
PieceCID: testutil.GenerateCid(),
Expand Down
17 changes: 14 additions & 3 deletions itests/dummydeal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ func TestDummydeal(t *testing.T) {
// Create a new dummy deal
dealUuid := uuid.New()

res, err := f.makeDummyDeal(dealUuid, carFilepath, rootCid, server.URL+"/"+filepath.Base(carFilepath))
res, err := f.makeDummyDeal(dealUuid, carFilepath, rootCid, server.URL+"/"+filepath.Base(carFilepath), false)
require.NoError(t, err)
require.True(t, res.Accepted)
log.Debugw("got response from MarketDummyDeal", "res", spew.Sdump(res))

time.Sleep(2 * time.Second)

failingDealUuid := uuid.New()
res2, err2 := f.makeDummyDeal(failingDealUuid, failingCarFilepath, failingRootCid, server.URL+"/"+filepath.Base(failingCarFilepath))
res2, err2 := f.makeDummyDeal(failingDealUuid, failingCarFilepath, failingRootCid, server.URL+"/"+filepath.Base(failingCarFilepath), false)
require.NoError(t, err2)
require.Equal(t, "cannot accept piece of size 2254421, on top of already allocated 2254421 bytes, because it would exceed max staging area size 4000000", res2.Reason)
log.Debugw("got response from MarketDummyDeal for failing deal", "res2", spew.Sdump(res2))
Expand All @@ -55,12 +55,23 @@ func TestDummydeal(t *testing.T) {
time.Sleep(100 * time.Millisecond)

passingDealUuid := uuid.New()
res2, err2 = f.makeDummyDeal(passingDealUuid, failingCarFilepath, failingRootCid, server.URL+"/"+filepath.Base(failingCarFilepath))
res2, err2 = f.makeDummyDeal(passingDealUuid, failingCarFilepath, failingRootCid, server.URL+"/"+filepath.Base(failingCarFilepath), false)
require.NoError(t, err2)
require.True(t, res2.Accepted)
log.Debugw("got response from MarketDummyDeal", "res2", spew.Sdump(res2))

// Wait for the deal to be added to a sector
err = f.waitForDealAddedToSector(passingDealUuid)
require.NoError(t, err)

// make an offline deal
offlineDealUuid := uuid.New()
res, err = f.makeDummyDeal(offlineDealUuid, carFilepath, rootCid, server.URL+"/"+filepath.Base(carFilepath), true)
require.NoError(t, err)
require.True(t, res.Accepted)
res, err = f.boost.MakeOfflineDealWithData(offlineDealUuid, carFilepath)
require.NoError(t, err)
require.True(t, res.Accepted)
err = f.waitForDealAddedToSector(offlineDealUuid)
require.NoError(t, err)
}
3 changes: 2 additions & 1 deletion itests/test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (f *testFramework) waitForDealAddedToSector(dealUuid uuid.UUID) error {
}
}

func (f *testFramework) makeDummyDeal(dealUuid uuid.UUID, carFilepath string, rootCid cid.Cid, url string) (*api.ProviderDealRejectionInfo, error) {
func (f *testFramework) makeDummyDeal(dealUuid uuid.UUID, carFilepath string, rootCid cid.Cid, url string, isOffline bool) (*api.ProviderDealRejectionInfo, error) {
cidAndSize, err := storagemarket.GenerateCommP(carFilepath)
if err != nil {
return nil, err
Expand Down Expand Up @@ -477,6 +477,7 @@ func (f *testFramework) makeDummyDeal(dealUuid uuid.UUID, carFilepath string, ro
dealParams := types.DealParams{
DealUUID: dealUuid,
ClientDealProposal: *signedProposal,
IsOffline: isOffline,
DealDataRoot: rootCid,
Transfer: types.Transfer{
Type: "http",
Expand Down
5 changes: 5 additions & 0 deletions node/impl/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,8 @@ func (sm *BoostAPI) Deal(ctx context.Context, dealUuid uuid.UUID) (*types.Provid
func (sm *BoostAPI) IndexerAnnounceAllDeals(ctx context.Context) error {
return sm.IndexProvider.IndexerAnnounceAllDeals(ctx)
}

func (sm *BoostAPI) MakeOfflineDealWithData(dealUuid uuid.UUID, filePath string) (*api.ProviderDealRejectionInfo, error) {
res, _, err := sm.StorageProvider.MakeOfflineDealWithData(dealUuid, filePath)
return res, err
}
64 changes: 41 additions & 23 deletions storagemarket/deal_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,31 +112,39 @@ func (p *Provider) execDealUptoAddPiece(ctx context.Context, pub event.Emitter,

p.dealLogger.Infow(deal.DealUuid, "deal execution in progress")

// Transfer Data
if deal.Checkpoint < dealcheckpoints.Transferred {
if err := p.transferAndVerify(dh.transferCtx, pub, deal); err != nil {
dh.transferCancelled(nil)
// if the transfer failed because of context cancellation and the context was not
// cancelled because of the user explicitly cancelling the transfer, this is a recoverable error.
if xerrors.Is(err, context.Canceled) && !dh.TransferCancelledByUser() {
// Transfer Data step will be executed only if it's NOT an offline deal
if !deal.IsOffline {
if deal.Checkpoint < dealcheckpoints.Transferred {
if err := p.transferAndVerify(dh.transferCtx, pub, deal); err != nil {
dh.transferCancelled(nil)
// if the transfer failed because of context cancellation and the context was not
// cancelled because of the user explicitly cancelling the transfer, this is a recoverable error.
if xerrors.Is(err, context.Canceled) && !dh.TransferCancelledByUser() {
return &dealMakingError{
recoverable: true,
err: fmt.Errorf("data transfer failed with a recoverable error after %d bytes with error: %w", deal.NBytesReceived, err),
uiMsg: fmt.Sprintf("data transfer paused after transferring %d bytes because Boost is shutting down", deal.NBytesReceived),
}
}
return &dealMakingError{
recoverable: true,
err: fmt.Errorf("data transfer failed with a recoverable error after %d bytes with error: %w", deal.NBytesReceived, err),
uiMsg: fmt.Sprintf("data transfer paused after transferring %d bytes because Boost is shutting down", deal.NBytesReceived),
err: fmt.Errorf("execDeal failed data transfer: %w", err),
}
}
return &dealMakingError{
err: fmt.Errorf("execDeal failed data transfer: %w", err),
}
}

p.dealLogger.Infow(deal.DealUuid, "deal data transfer finished successfully")
p.dealLogger.Infow(deal.DealUuid, "deal data transfer finished successfully")
} else {
p.dealLogger.Infow(deal.DealUuid, "deal data transfer has already been completed")
}
// transfer can no longer be cancelled
dh.transferCancelled(errors.New("transfer already complete"))
p.dealLogger.Infow(deal.DealUuid, "deal data-transfer can no longer be cancelled")
} else {
p.dealLogger.Infow(deal.DealUuid, "deal data transfer has already been completed")
// verify CommP matches for an offline deal
if err := p.verifyCommP(deal); err != nil {
return &dealMakingError{err: fmt.Errorf("error when matching commP for imported data for offline deal: %w", err)}
}
p.dealLogger.Infow(deal.DealUuid, "commp matched successfully for imported data for offline deal")
}
// transfer can no longer be cancelled
dh.transferCancelled(errors.New("transfer already complete"))
p.dealLogger.Infow(deal.DealUuid, "deal data-transfer can no longer be cancelled")

// Publish
if deal.Checkpoint <= dealcheckpoints.Published {
Expand Down Expand Up @@ -252,6 +260,15 @@ func (p *Provider) transferAndVerify(ctx context.Context, pub event.Emitter, dea
time.Since(st).String())

// Verify CommP matches
if err := p.verifyCommP(deal); err != nil {
return fmt.Errorf("failed to verify CommP: %w", err)
}

p.dealLogger.Infow(deal.DealUuid, "commP matched successfully: deal-data verified")
return p.updateCheckpoint(pub, deal, dealcheckpoints.Transferred)
}

func (p *Provider) verifyCommP(deal *types.ProviderDealState) error {
pieceCid, err := GeneratePieceCommitment(deal.InboundFilePath, deal.ClientDealProposal.Proposal.PieceSize)
if err != nil {
return fmt.Errorf("failed to generate CommP: %w", err)
Expand All @@ -262,8 +279,7 @@ func (p *Provider) transferAndVerify(ctx context.Context, pub event.Emitter, dea
return fmt.Errorf("commP mismatch, expected=%s, actual=%s", clientPieceCid, pieceCid)
}

p.dealLogger.Infow(deal.DealUuid, "commP matched successfully: deal-data verified")
return p.updateCheckpoint(pub, deal, dealcheckpoints.Transferred)
return nil
}

func (p *Provider) waitForTransferFinish(ctx context.Context, handler transport.Handler, pub event.Emitter, deal *types.ProviderDealState) error {
Expand Down Expand Up @@ -552,8 +568,10 @@ func (p *Provider) cleanupDealLogged(deal *types.ProviderDealState) {
}

func (p *Provider) cleanupDeal(deal *types.ProviderDealState) {
// remove the temp file created for inbound deal data
_ = os.Remove(deal.InboundFilePath)
// remove the temp file created for inbound deal data if it is not an offline deal
if !deal.IsOffline {
_ = os.Remove(deal.InboundFilePath)
}

// close and clean up the deal handler
dh := p.getDealHandler(deal.DealUuid)
Expand Down
Loading