Skip to content

Commit

Permalink
Merge pull request #471 from ipfs-force-community/feat/import-data-by…
Browse files Browse the repository at this point in the history
…-uuid

feat: import data by uuid
  • Loading branch information
LinZexiao authored and simlecode committed Oct 31, 2023
1 parent 6173e84 commit 91d1400
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 40 deletions.
21 changes: 9 additions & 12 deletions api/impl/venus_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,21 +1110,17 @@ func (m *MarketNodeImpl) UpdateDealStatus(ctx context.Context, miner address.Add
return m.DealAssigner.UpdateDealStatus(ctx, miner, dealId, pieceStatus, dealStatus)
}

func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string, skipCommP bool) error {
deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, dealPropCid)
func (m *MarketNodeImpl) DealsImportData(ctx context.Context, ref types.ImportDataRef, skipCommP bool) error {
deal, _, err := storageprovider.GetDealByDataRef(ctx, m.Repo.StorageDealRepo(), &ref)
if err != nil {
return err
}
if err := jwtclient.CheckPermissionByMiner(ctx, m.AuthClient, deal.Proposal.Provider); err != nil {
return err
}

ref := &types.ImportDataRef{
ProposalCID: dealPropCid,
File: file,
}
res, err := m.DealsBatchImportData(ctx, types.ImportDataRefs{
Refs: []*types.ImportDataRef{ref},
Refs: []*types.ImportDataRef{&ref},
SkipCommP: skipCommP,
})
if err != nil {
Expand Down Expand Up @@ -1300,19 +1296,20 @@ func (m *MarketNodeImpl) DealsBatchImportData(ctx context.Context, refs types.Im
refLen := len(refs.Refs)
results := make([]*types.ImportDataResult, 0, refLen)
validRefs := make([]*types.ImportDataRef, 0, refLen)

for _, ref := range refs.Refs {
deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, ref.ProposalCID)
deal, target, err := storageprovider.GetDealByDataRef(ctx, m.Repo.StorageDealRepo(), ref)
if err != nil {
results = append(results, &types.ImportDataResult{
ProposalCID: ref.ProposalCID,
Message: err.Error(),
Target: target,
Message: err.Error(),
})
continue
}
if err := jwtclient.CheckPermissionByMiner(ctx, m.AuthClient, deal.Proposal.Provider); err != nil {
results = append(results, &types.ImportDataResult{
ProposalCID: ref.ProposalCID,
Message: err.Error(),
Target: target,
Message: err.Error(),
})
continue
}
Expand Down
51 changes: 36 additions & 15 deletions cli/storage-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

tm "github.com/buger/goterm"
"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/urfave/cli/v2"

Expand Down Expand Up @@ -79,12 +80,19 @@ var dealsImportDataCmd = &cli.Command{
return fmt.Errorf("must specify proposal CID and file path")
}

propCid, err := cid.Decode(cctx.Args().Get(0))
if err != nil {
return err
ref := market.ImportDataRef{
File: cctx.Args().Get(1),
}

fpath := cctx.Args().Get(1)
propCid, err := cid.Decode(cctx.Args().Get(0))
if err == nil {
ref.ProposalCID = propCid
} else {
ref.UUID, err = uuid.Parse(cctx.Args().Get(0))
if err != nil {
return err
}
}

var skipCommP bool
if cctx.IsSet("skip-commp") {
Expand All @@ -94,7 +102,12 @@ var dealsImportDataCmd = &cli.Command{
skipCommP = true
}

return api.DealsImportData(ctx, propCid, fpath, skipCommP)
if err := api.DealsImportData(ctx, ref, skipCommP); err != nil {
return err
}
fmt.Println("import data success")

return nil
},
}

Expand Down Expand Up @@ -161,16 +174,24 @@ bass-xxx,baefaxxx
if len(arr) != 2 {
continue
}

ref := &market.ImportDataRef{
File: arr[1],
}
if len(arr[1]) != 0 && len(carDir) != 0 {
ref.File = filepath.Join(carDir, ref.File)
}

proposalCID, err := cid.Parse(arr[0])
if err == nil && len(arr[1]) != 0 {
ref := &market.ImportDataRef{
ProposalCID: proposalCID,
File: arr[1],
}
if len(carDir) != 0 {
ref.File = filepath.Join(carDir, ref.File)
}
if err == nil {
ref.ProposalCID = proposalCID
refs = append(refs, ref)
continue
}

id, err := uuid.Parse(arr[0])
if err == nil {
ref.UUID = id
refs = append(refs, ref)
}
}
Expand All @@ -192,9 +213,9 @@ bass-xxx,baefaxxx

for _, r := range res {
if len(r.Message) == 0 {
fmt.Printf("import data success: %s\n", r.ProposalCID)
fmt.Printf("import data success: %s\n", r.Target)
} else {
fmt.Printf("import data failed, deal: %s, error: %s\n", r.ProposalCID, r.Message)
fmt.Printf("import data failed, deal: %s, error: %s\n", r.Target, r.Message)
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.14.0-rc3
github.com/filecoin-project/venus v1.14.0-rc3.0.20231019071557-12e011e709ac
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ github.com/filecoin-project/specs-actors/v7 v7.0.1 h1:w72xCxijK7xs1qzmJiw+WYJaVt
github.com/filecoin-project/specs-actors/v7 v7.0.1/go.mod h1:tPLEYXoXhcpyLh69Ccq91SOuLXsPWjHiY27CzawjUEk=
github.com/filecoin-project/specs-actors/v8 v8.0.1 h1:4u0tIRJeT5G7F05lwLRIsDnsrN+bJ5Ixj6h49Q7uE2Y=
github.com/filecoin-project/specs-storage v0.4.1 h1:yvLEaLZj8f+uByhNC4mFOtCUyL2wQku+NGBp6hjTe9M=
github.com/filecoin-project/venus v1.14.0-rc3 h1:1Q825vaN+1ge8oUlZybwlBI+wZqtIF+lAinNAtMjVqM=
github.com/filecoin-project/venus v1.14.0-rc3/go.mod h1:bLEVCNZ6W3KIzM+If8fUveFHlz7LJcdPVsSHT1MJkMc=
github.com/filecoin-project/venus v1.14.0-rc3.0.20231019071557-12e011e709ac h1:WfkaY28lHUlu/NiQdd2f6fZ9xnVD86clvGMENysDc9M=
github.com/filecoin-project/venus v1.14.0-rc3.0.20231019071557-12e011e709ac/go.mod h1:zw20NIwK28DR4lywXNV5jtJsUEhV6haniEMQpJDfqNU=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=
github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
Expand Down
31 changes: 21 additions & 10 deletions storageprovider/storage_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,27 +283,38 @@ func (p *StorageProviderImpl) Stop() error {
return p.net.StopHandlingRequests()
}

func GetDealByDataRef(ctx context.Context, r repo.StorageDealRepo, ref *types.ImportDataRef) (*types.MinerDeal, string, error) {
deal, err := r.GetDeal(ctx, ref.ProposalCID)
if err == nil {
return deal, ref.ProposalCID.String(), nil
}
deal, err = r.GetDealByUUID(ctx, ref.UUID)
return deal, ref.UUID.String(), err
}

// ImportDataForDeals manually batch imports data for offline storage deals
func (p *StorageProviderImpl) ImportDataForDeals(ctx context.Context, refs []*types.ImportDataRef, skipCommP bool) ([]*types.ImportDataResult, error) {
// TODO: be able to check if we have enough disk space
results := make([]*types.ImportDataResult, 0, len(refs))
minerDeals := make(map[address.Address][]*types.MinerDeal)
targets := make(map[cid.Cid]string, len(refs))
for _, ref := range refs {
d, err := p.dealStore.GetDeal(ctx, ref.ProposalCID)
d, target, err := GetDealByDataRef(ctx, p.dealStore, ref)
if err != nil {
results = append(results, &types.ImportDataResult{
ProposalCID: ref.ProposalCID,
Message: fmt.Errorf("failed getting deal: %v", err).Error(),
Target: target,
Message: fmt.Errorf("failed getting deal: %v", err).Error(),
})
continue
}
if err := p.importDataForDeal(ctx, d, ref, skipCommP); err != nil {
results = append(results, &types.ImportDataResult{
ProposalCID: ref.ProposalCID,
Message: err.Error(),
Target: target,
Message: err.Error(),
})
continue
}
targets[d.ProposalCid] = target
minerDeals[d.Proposal.Provider] = append(minerDeals[d.Proposal.Provider], d)
}

Expand All @@ -313,8 +324,8 @@ func (p *StorageProviderImpl) ImportDataForDeals(ctx context.Context, refs []*ty
log.Errorf("batch reserver funds for %s failed: %v", provider, err)
for _, deal := range deals {
results = append(results, &types.ImportDataResult{
ProposalCID: deal.ProposalCid,
Message: err.Error(),
Target: targets[deal.ProposalCid],
Message: err.Error(),
})
}
continue
Expand All @@ -323,13 +334,13 @@ func (p *StorageProviderImpl) ImportDataForDeals(ctx context.Context, refs []*ty
for _, deal := range deals {
if err := res[deal.ProposalCid]; err != nil {
results = append(results, &types.ImportDataResult{
ProposalCID: deal.ProposalCid,
Message: err.Error(),
Target: targets[deal.ProposalCid],
Message: err.Error(),
})
continue
}
results = append(results, &types.ImportDataResult{
ProposalCID: deal.ProposalCid,
Target: targets[deal.ProposalCid],
})

go func(deal *types.MinerDeal) {
Expand Down

0 comments on commit 91d1400

Please sign in to comment.