Skip to content

Commit

Permalink
feat: add command to cancel data transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
simlecode committed Mar 24, 2023
1 parent 012d2aa commit 4d4ed90
Showing 1 changed file with 112 additions and 0 deletions.
112 changes: 112 additions & 0 deletions cli/data_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
tm "github.com/buger/goterm"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/urfave/cli/v2"
)
Expand All @@ -22,6 +24,7 @@ var DataTransfersCmd = &cli.Command{
transfersListCmd,
marketRestartTransfer,
marketCancelTransfer,
marketCancelRetrievalTransfer,
},
}

Expand Down Expand Up @@ -152,6 +155,115 @@ var marketCancelTransfer = &cli.Command{
},
}

var marketCancelRetrievalTransfer = &cli.Command{
Name: "batch-cancel",
Usage: "Batch force cancel data transfer",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "peer-id",
Usage: "client peer id",
},
&cli.StringFlag{
Name: "data-cid",
Usage: "data root cid",
},
&cli.BoolFlag{
Name: "initiator",
Usage: "specify only transfers where peer is/is not initiator",
Value: false,
},
&cli.DurationFlag{
Name: "cancel-timeout",
Usage: "time to wait for cancel to be sent to client",
Value: 5 * time.Second,
},
},
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := NewMarketNode(cctx)
if err != nil {
return err
}
defer closer()

ctx := ReqContext(cctx)
params := market.RetrievalDealQueryParams{
DiscardFailedDeal: true,
}
initiator := cctx.Bool("initiator")

if cctx.IsSet("peer-id") {
params.Receiver = cctx.String("peer-id")
}
if cctx.IsSet("data-cid") {
params.PayloadCID = cctx.String("data-cid")
}
if len(params.Receiver) == 0 && len(params.PayloadCID) == 0 {
return fmt.Errorf("`peer-id` and `data-cid` must be set to one")
}

deals, err := nodeApi.MarketListRetrievalDeals(ctx, &params)
if err != nil {
return err
}
if len(deals) == 0 {
return fmt.Errorf("not found retrieval deal")
}

dealMap := make(map[datatransfer.TransferID]*market.ProviderDealState, len(deals))
for i, deal := range deals {
if deal.ChannelID == nil {
continue
}
if deal.Status == retrievalmarket.DealStatusRejected ||
deal.Status == retrievalmarket.DealStatusCompleted ||
deal.Status == retrievalmarket.DealStatusErrored ||
deal.Status == retrievalmarket.DealStatusCancelled {
continue
}
dealMap[deal.ChannelID.ID] = &deals[i]
}
if len(dealMap) == 0 {
return fmt.Errorf("no data transfer need cancel")
}

channels, err := nodeApi.MarketListDataTransfers(ctx)
if err != nil {
return err
}

pendingDeals := make([]*market.ProviderDealState, 0, len(channels))
otherPeers := make([]peer.ID, 0, len(channels))
for _, channel := range channels {
if channel.Status == datatransfer.Completed || channel.Status == datatransfer.Failed ||
channel.Status == datatransfer.Cancelled {
continue
}
if deal, ok := dealMap[channel.TransferID]; channel.IsInitiator == initiator && ok {
pendingDeals = append(pendingDeals, deal)
otherPeers = append(otherPeers, channel.OtherPeer)
}
}

fmt.Printf("find %d data transfer need to cancel\n", len(pendingDeals))

timeout := cctx.Duration("cancel-timeout")
for i, deal := range pendingDeals {
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

fmt.Printf("start cancel data transfer: %v\n", deal.ChannelID.ID)
err := nodeApi.MarketCancelDataTransfer(timeoutCtx, deal.ChannelID.ID, otherPeers[i], initiator)
if err != nil {
fmt.Printf("cancel %d failed: %v\n", deal.ChannelID.ID, err)
} else {
fmt.Printf("cancel %d success\n", deal.ChannelID.ID)
}
}

return nil
},
}

var transfersListCmd = &cli.Command{
Name: "list",
Usage: "List ongoing data transfers for this miner",
Expand Down

0 comments on commit 4d4ed90

Please sign in to comment.