Skip to content

Commit

Permalink
Merge pull request #517 from ipfs-force-community/feat/auto-import-deal
Browse files Browse the repository at this point in the history
feat: auto import direct deal
  • Loading branch information
LinZexiao authored Apr 1, 2024
2 parents 6b56ea6 + 409c9e9 commit fff8a9b
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 67 deletions.
19 changes: 10 additions & 9 deletions cli/direct-deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ var getDirectDealCmd = &cli.Command{

data := []kv{
{"Creation", time.Unix(int64(deal.CreatedAt), 0).Format(time.RFC3339)},
{"ID", deal.ID},
{"PieceCID", deal.PieceCID},
{"PieceSize", deal.PieceSize},
{"Client", deal.Client},
Expand Down Expand Up @@ -270,11 +271,11 @@ var importDirectDealCmd = &cli.Command{

allocationID := cliCtx.Uint64("allocation-id")

startEpoch, err := getStartEpoch(cliCtx, fapi)
startEpoch, err := GetStartEpoch(cliCtx, fapi)
if err != nil {
return err
}
endEpoch, err := checkAndGetEndEpoch(cliCtx.Context, fapi, client, allocationID, startEpoch)
endEpoch, err := CheckAndGetEndEpoch(cliCtx.Context, fapi, client, allocationID, startEpoch)
if err != nil {
return err
}
Expand Down Expand Up @@ -306,14 +307,14 @@ var importDirectDealCmd = &cli.Command{
},
}

func getStartEpoch(cliCtx *cli.Context, fapi v1api.FullNode) (abi.ChainEpoch, error) {
func GetStartEpoch(cliCtx *cli.Context, fapi v1api.FullNode) (abi.ChainEpoch, error) {
startEpoch := abi.ChainEpoch(cliCtx.Int("start-epoch"))
if startEpoch == 0 {
head, err := fapi.ChainHead(cliCtx.Context)
if err != nil {
return 0, err
}
startEpoch = head.Height() + builtin.EpochsInDay*2
startEpoch = head.Height() + builtin.EpochsInDay*8
}

return startEpoch, nil
Expand Down Expand Up @@ -349,7 +350,7 @@ var importDirectDealsCmd = &cli.Command{
},
&cli.IntFlag{
Name: "start-epoch",
Usage: "start epoch by when the deal should be proved by provider on-chain (default: 2 days from now)",
Usage: "start epoch by when the deal should be proved by provider on-chain (default: 8 days from now)",
},
},
Action: func(cliCtx *cli.Context) error {
Expand Down Expand Up @@ -386,7 +387,7 @@ var importDirectDealsCmd = &cli.Command{
return "", fmt.Errorf("car %s file not found", pieceCID.String())
}

startEpoch, err := getStartEpoch(cliCtx, fapi)
startEpoch, err := GetStartEpoch(cliCtx, fapi)
if err != nil {
return err
}
Expand All @@ -411,7 +412,7 @@ var importDirectDealsCmd = &cli.Command{
return fmt.Errorf("invalid client: %w", err)
}

endEpoch, err := checkAndGetEndEpoch(ctx, fapi, client, allocationID, startEpoch)
endEpoch, err := CheckAndGetEndEpoch(ctx, fapi, client, allocationID, startEpoch)
if err != nil {
return err
}
Expand Down Expand Up @@ -441,7 +442,7 @@ var importDirectDealsCmd = &cli.Command{
return fmt.Errorf("failed to load allocations: %w", err)
}
for _, a := range allocations {
endEpoch, err := checkAndGetEndEpoch(ctx, fapi, a.Client, a.AllocationID, startEpoch)
endEpoch, err := CheckAndGetEndEpoch(ctx, fapi, a.Client, a.AllocationID, startEpoch)
if err != nil {
return err
}
Expand Down Expand Up @@ -483,7 +484,7 @@ var importDirectDealsCmd = &cli.Command{
},
}

func checkAndGetEndEpoch(ctx context.Context,
func CheckAndGetEndEpoch(ctx context.Context,
fapi v1api.FullNode,
client address.Address,
allocationID uint64,
Expand Down
201 changes: 155 additions & 46 deletions cmd/droplet-client/direct-deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ import (
verifregst "github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
"github.com/filecoin-project/venus/venus-shared/actors"
"github.com/filecoin-project/venus/venus-shared/actors/builtin/datacap"
"github.com/filecoin-project/venus/venus-shared/api"
v1 "github.com/filecoin-project/venus/venus-shared/api/chain/v1"
marketapi "github.com/filecoin-project/venus/venus-shared/api/market/v1"
"github.com/filecoin-project/venus/venus-shared/types"
types2 "github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/google/uuid"
cli2 "github.com/ipfs-force-community/droplet/v2/cli"
"github.com/ipfs-force-community/droplet/v2/cli/tablewriter"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -74,8 +78,9 @@ var directDealAllocate = &cli.Command{
termMaxFlag,
expirationFlag,
&cli.StringSliceFlag{
Name: "piece-info",
Usage: "data pieceInfo[s] to create the allocation. The format must be pieceCid1=pieceSize1 pieceCid2=pieceSize2",
Name: "piece-info",
Usage: "data pieceInfo[s] to create the allocation. The format must be pieceCid1=pieceSize1 pieceCid2=pieceSize2",
Hidden: true,
},
&cli.BoolFlag{
Name: "quiet",
Expand All @@ -91,6 +96,18 @@ var directDealAllocate = &cli.Command{
Usage: "Output allocation information to a file.",
Value: "allocation.csv",
},
&cli.StringFlag{
Name: "droplet-url",
Usage: "Url of the droplet service",
},
&cli.StringFlag{
Name: "droplet-token",
Usage: "Token of the droplet service",
},
&cli.IntFlag{
Name: "start-epoch",
Usage: "start epoch by when the deal should be proved by provider on-chain (default: 8 days from now)",
},
},
Action: func(cctx *cli.Context) error {
if cctx.IsSet("piece-info") && cctx.IsSet("manifest") {
Expand Down Expand Up @@ -250,7 +267,28 @@ var directDealAllocate = &cli.Command{
return fmt.Errorf("failed to execute the message with error: %s", res.Receipt.ExitCode.Error())
}

return showAllocations(ctx, fapi, walletAddr, oldAllocations, cctx.Bool("json"), cctx.Bool("quiet"), cctx.String("output-allocation-to-file"), pieceInfos)
newAllocations, err := findNewAllocations(ctx, fapi, walletAddr, oldAllocations)
if err != nil {
return fmt.Errorf("failed to find new allocations: %w", err)
}

if err := writeAllocationsToFile(cctx.String("output-allocation-to-file"), newAllocations, pieceInfos); err != nil {
fmt.Println("failed to write allocations to file: ", err)
}

if err := showAllocations(newAllocations, cctx.Bool("json"), cctx.Bool("quiet")); err != nil {
fmt.Println("failed to show allocations: ", err)
}

if cctx.IsSet("droplet-url") {
fmt.Println("importing deal to droplet")
if err := autoImportDealToDroplet(cctx, newAllocations, pieceInfos); err != nil {
return fmt.Errorf("failed to import deal to droplet: %w", err)
}
fmt.Println("successfully imported deal to droplet")
}

return nil
},
}

Expand Down Expand Up @@ -346,18 +384,66 @@ func getAllocationParams(cctx *cli.Context, currentHeight abi.ChainEpoch) (*allo
return &params, nil
}

func findNewAllocations(ctx context.Context, fapi v1.FullNode, walletAddr address.Address, oldAllocations map[types.AllocationId]types.Allocation) (map[types.AllocationId]types.Allocation, error) {
allAllocations, err := fapi.StateGetAllocations(ctx, walletAddr, types.EmptyTSK)
if err != nil {
return nil, fmt.Errorf("failed to get allocations: %w", err)
}

newAllocations := make(map[types.AllocationId]types.Allocation, len(allAllocations)-len(oldAllocations))
for k, v := range allAllocations {
if _, ok := oldAllocations[k]; !ok {
newAllocations[k] = v
}
}

return newAllocations, nil
}

type partAllocationInfo struct {
AllocationID types.AllocationId
PieceCID cid.Cid
Client address.Address
}

func showAllocations(ctx context.Context, fapi v1.FullNode, walletAddr address.Address, oldAllocations map[types.AllocationId]types.Allocation, useJSON bool, quite bool, allocationFile string, pieceInfos []*pieceInfo) error {
newAllocations, err := fapi.StateGetAllocations(ctx, walletAddr, types.EmptyTSK)
if err != nil {
return fmt.Errorf("failed to get allocations: %w", err)
func writeAllocationsToFile(allocationFile string, allocations map[types.AllocationId]types.Allocation, pieceInfo []*pieceInfo) error {
payloadSizes := make(map[cid.Cid]uint64)
for _, info := range pieceInfo {
payloadSizes[info.pieceCID] = info.payloadSize
}

infos := make([]partAllocationInfo, 0, len(allocations))
for id, v := range allocations {
clientAddr, _ := address.NewIDAddress(uint64(v.Client))
infos = append(infos, partAllocationInfo{
AllocationID: id,
PieceCID: v.Data,
Client: clientAddr,
})
}

sort.Slice(infos, func(i, j int) bool {
return infos[i].AllocationID < infos[j].AllocationID
})

buf := &bytes.Buffer{}
w := csv.NewWriter(buf)
if err := w.Write([]string{"AllocationID", "PieceCID", "Client", "PayloadSize"}); err != nil {
return err
}
for _, info := range infos {
if err := w.Write([]string{fmt.Sprintf("%d", info.AllocationID), info.PieceCID.String(), info.Client.String(), fmt.Sprintf("%d", payloadSizes[info.PieceCID])}); err != nil {
return err
}
}
w.Flush()

fmt.Println("writing allocations to:", allocationFile)

return os.WriteFile(allocationFile, buf.Bytes(), 0644)
}

func showAllocations(allocations map[types.AllocationId]types.Allocation, useJSON bool, quite bool) error {
// Map Keys. Corresponds to the standard tablewriter output
allocationID := "AllocationID"
client := "Client"
Expand All @@ -381,34 +467,20 @@ func showAllocations(ctx context.Context, fapi v1.FullNode, walletAddr address.A
}

var allocs []map[string]interface{}
var partAllocationInfos []partAllocationInfo
for key, val := range newAllocations {
_, ok := oldAllocations[key]
if !ok {
clientAddr, _ := address.NewIDAddress(uint64(val.Client))
providerAddr, _ := address.NewIDAddress(uint64(val.Provider))
alloc := map[string]interface{}{
allocationID: key,
client: clientAddr,
provider: providerAddr,
pieceCid: val.Data,
pieceSize: val.Size,
tMin: val.TermMin,
tMax: val.TermMax,
expr: val.Expiration,
}
allocs = append(allocs, alloc)

partAllocationInfos = append(partAllocationInfos, partAllocationInfo{
AllocationID: key,
PieceCID: val.Data,
Client: walletAddr,
})
for key, val := range allocations {
clientAddr, _ := address.NewIDAddress(uint64(val.Client))
providerAddr, _ := address.NewIDAddress(uint64(val.Provider))
alloc := map[string]interface{}{
allocationID: key,
client: clientAddr,
provider: providerAddr,
pieceCid: val.Data,
pieceSize: val.Size,
tMin: val.TermMin,
tMax: val.TermMax,
expr: val.Expiration,
}
}

if err := outputAllocationToFile(allocationFile, partAllocationInfos, pieceInfos); err != nil {
fmt.Println("output allocation to file error: ", err)
allocs = append(allocs, alloc)
}

if quite {
Expand Down Expand Up @@ -445,26 +517,63 @@ func showAllocations(ctx context.Context, fapi v1.FullNode, walletAddr address.A
return tw.Flush(os.Stdout)
}

func outputAllocationToFile(allocationFile string, infos []partAllocationInfo, pieceInfo []*pieceInfo) error {
func autoImportDealToDroplet(cliCtx *cli.Context, allocations map[types.AllocationId]types.Allocation, pieceInfos []*pieceInfo) error {
ctx := cliCtx.Context
dropletURL := cliCtx.String("droplet-url")
dropletToken := cliCtx.String("droplet-token")

apiInfo := api.NewAPIInfo(dropletURL, dropletToken)
addr, err := apiInfo.DialArgs("v0")
if err != nil {
return err
}

mapi, close, err := marketapi.NewIMarketRPC(ctx, addr, apiInfo.AuthHeader())
if err != nil {
return err
}
defer close()

fapi, fclose, err := cli2.NewFullNode(cliCtx, cli2.OldClientRepoPath)
if err != nil {
return err
}
defer fclose()

params := types2.DirectDealParams{
SkipCommP: true,
SkipGenerateIndex: true,
NoCopyCarFile: true,
DealParams: make([]types2.DirectDealParam, 0, len(allocations)),
}

payloadSizes := make(map[cid.Cid]uint64)
for _, info := range pieceInfo {
for _, info := range pieceInfos {
payloadSizes[info.pieceCID] = info.payloadSize
}
sort.Slice(infos, func(i, j int) bool {
return infos[i].AllocationID < infos[j].AllocationID
})

buf := &bytes.Buffer{}
w := csv.NewWriter(buf)
if err := w.Write([]string{"AllocationID", "PieceCID", "Client", "PayloadSize"}); err != nil {
startEpoch, err := cli2.GetStartEpoch(cliCtx, fapi)
if err != nil {
return err
}
for _, info := range infos {
if err := w.Write([]string{fmt.Sprintf("%d", info.AllocationID), info.PieceCID.String(), info.Client.String(), fmt.Sprintf("%d", payloadSizes[info.PieceCID])}); err != nil {

for id, alloc := range allocations {
clientAddr, _ := address.NewIDAddress(uint64(alloc.Client))
endEpoch, err := cli2.CheckAndGetEndEpoch(ctx, fapi, clientAddr, uint64(id), startEpoch)
if err != nil {
return err
}

params.DealParams = append(params.DealParams, types2.DirectDealParam{
DealUUID: uuid.New(),
AllocationID: uint64(id),
PayloadSize: payloadSizes[alloc.Data],
Client: clientAddr,
PieceCID: alloc.Data,
StartEpoch: startEpoch,
EndEpoch: endEpoch,
})
}
w.Flush()

return os.WriteFile(allocationFile, buf.Bytes(), 0644)
return mapi.ImportDirectDeal(ctx, &params)
}
Loading

0 comments on commit fff8a9b

Please sign in to comment.