From 91d1400c72d39f608d21cf06830a3b98d4f91d1b Mon Sep 17 00:00:00 2001 From: LinZexiao <55120714+LinZexiao@users.noreply.github.com> Date: Thu, 19 Oct 2023 15:46:05 +0800 Subject: [PATCH] Merge pull request #471 from ipfs-force-community/feat/import-data-by-uuid feat: import data by uuid --- api/impl/venus_market.go | 21 +++++------- cli/storage-deals.go | 51 ++++++++++++++++++++--------- go.mod | 2 +- go.sum | 4 +-- storageprovider/storage_provider.go | 31 ++++++++++++------ 5 files changed, 69 insertions(+), 40 deletions(-) diff --git a/api/impl/venus_market.go b/api/impl/venus_market.go index f0759188..c2922585 100644 --- a/api/impl/venus_market.go +++ b/api/impl/venus_market.go @@ -1110,8 +1110,8 @@ func (m *MarketNodeImpl) UpdateDealStatus(ctx context.Context, miner address.Add return m.DealAssigner.UpdateDealStatus(ctx, miner, dealId, pieceStatus, dealStatus) } -func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string, skipCommP bool) error { - deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, dealPropCid) +func (m *MarketNodeImpl) DealsImportData(ctx context.Context, ref types.ImportDataRef, skipCommP bool) error { + deal, _, err := storageprovider.GetDealByDataRef(ctx, m.Repo.StorageDealRepo(), &ref) if err != nil { return err } @@ -1119,12 +1119,8 @@ func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Ci return err } - ref := &types.ImportDataRef{ - ProposalCID: dealPropCid, - File: file, - } res, err := m.DealsBatchImportData(ctx, types.ImportDataRefs{ - Refs: []*types.ImportDataRef{ref}, + Refs: []*types.ImportDataRef{&ref}, SkipCommP: skipCommP, }) if err != nil { @@ -1300,19 +1296,20 @@ func (m *MarketNodeImpl) DealsBatchImportData(ctx context.Context, refs types.Im refLen := len(refs.Refs) results := make([]*types.ImportDataResult, 0, refLen) validRefs := make([]*types.ImportDataRef, 0, refLen) + for _, ref := range refs.Refs { - deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, ref.ProposalCID) + deal, target, err := storageprovider.GetDealByDataRef(ctx, m.Repo.StorageDealRepo(), ref) if err != nil { results = append(results, &types.ImportDataResult{ - ProposalCID: ref.ProposalCID, - Message: err.Error(), + Target: target, + Message: err.Error(), }) continue } if err := jwtclient.CheckPermissionByMiner(ctx, m.AuthClient, deal.Proposal.Provider); err != nil { results = append(results, &types.ImportDataResult{ - ProposalCID: ref.ProposalCID, - Message: err.Error(), + Target: target, + Message: err.Error(), }) continue } diff --git a/cli/storage-deals.go b/cli/storage-deals.go index dfcba652..b378f184 100644 --- a/cli/storage-deals.go +++ b/cli/storage-deals.go @@ -13,6 +13,7 @@ import ( tm "github.com/buger/goterm" "github.com/docker/go-units" + "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/urfave/cli/v2" @@ -79,12 +80,19 @@ var dealsImportDataCmd = &cli.Command{ return fmt.Errorf("must specify proposal CID and file path") } - propCid, err := cid.Decode(cctx.Args().Get(0)) - if err != nil { - return err + ref := market.ImportDataRef{ + File: cctx.Args().Get(1), } - fpath := cctx.Args().Get(1) + propCid, err := cid.Decode(cctx.Args().Get(0)) + if err == nil { + ref.ProposalCID = propCid + } else { + ref.UUID, err = uuid.Parse(cctx.Args().Get(0)) + if err != nil { + return err + } + } var skipCommP bool if cctx.IsSet("skip-commp") { @@ -94,7 +102,12 @@ var dealsImportDataCmd = &cli.Command{ skipCommP = true } - return api.DealsImportData(ctx, propCid, fpath, skipCommP) + if err := api.DealsImportData(ctx, ref, skipCommP); err != nil { + return err + } + fmt.Println("import data success") + + return nil }, } @@ -161,16 +174,24 @@ bass-xxx,baefaxxx if len(arr) != 2 { continue } + + ref := &market.ImportDataRef{ + File: arr[1], + } + if len(arr[1]) != 0 && len(carDir) != 0 { + ref.File = filepath.Join(carDir, ref.File) + } + proposalCID, err := cid.Parse(arr[0]) - if err == nil && len(arr[1]) != 0 { - ref := &market.ImportDataRef{ - ProposalCID: proposalCID, - File: arr[1], - } - if len(carDir) != 0 { - ref.File = filepath.Join(carDir, ref.File) - } + if err == nil { + ref.ProposalCID = proposalCID + refs = append(refs, ref) + continue + } + id, err := uuid.Parse(arr[0]) + if err == nil { + ref.UUID = id refs = append(refs, ref) } } @@ -192,9 +213,9 @@ bass-xxx,baefaxxx for _, r := range res { if len(r.Message) == 0 { - fmt.Printf("import data success: %s\n", r.ProposalCID) + fmt.Printf("import data success: %s\n", r.Target) } else { - fmt.Printf("import data failed, deal: %s, error: %s\n", r.ProposalCID, r.Message) + fmt.Printf("import data failed, deal: %s, error: %s\n", r.Target, r.Message) } } diff --git a/go.mod b/go.mod index 208cc156..2e1a9613 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/filecoin-project/go-statestore v0.2.0 github.com/filecoin-project/specs-actors/v2 v2.3.6 github.com/filecoin-project/specs-actors/v7 v7.0.1 - github.com/filecoin-project/venus v1.14.0-rc3 + github.com/filecoin-project/venus v1.14.0-rc3.0.20231019071557-12e011e709ac github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 diff --git a/go.sum b/go.sum index e468640a..a5eb2ec4 100644 --- a/go.sum +++ b/go.sum @@ -379,8 +379,8 @@ github.com/filecoin-project/specs-actors/v7 v7.0.1 h1:w72xCxijK7xs1qzmJiw+WYJaVt github.com/filecoin-project/specs-actors/v7 v7.0.1/go.mod h1:tPLEYXoXhcpyLh69Ccq91SOuLXsPWjHiY27CzawjUEk= github.com/filecoin-project/specs-actors/v8 v8.0.1 h1:4u0tIRJeT5G7F05lwLRIsDnsrN+bJ5Ixj6h49Q7uE2Y= github.com/filecoin-project/specs-storage v0.4.1 h1:yvLEaLZj8f+uByhNC4mFOtCUyL2wQku+NGBp6hjTe9M= -github.com/filecoin-project/venus v1.14.0-rc3 h1:1Q825vaN+1ge8oUlZybwlBI+wZqtIF+lAinNAtMjVqM= -github.com/filecoin-project/venus v1.14.0-rc3/go.mod h1:bLEVCNZ6W3KIzM+If8fUveFHlz7LJcdPVsSHT1MJkMc= +github.com/filecoin-project/venus v1.14.0-rc3.0.20231019071557-12e011e709ac h1:WfkaY28lHUlu/NiQdd2f6fZ9xnVD86clvGMENysDc9M= +github.com/filecoin-project/venus v1.14.0-rc3.0.20231019071557-12e011e709ac/go.mod h1:zw20NIwK28DR4lywXNV5jtJsUEhV6haniEMQpJDfqNU= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= diff --git a/storageprovider/storage_provider.go b/storageprovider/storage_provider.go index 40ca4a70..f7c8738f 100644 --- a/storageprovider/storage_provider.go +++ b/storageprovider/storage_provider.go @@ -283,27 +283,38 @@ func (p *StorageProviderImpl) Stop() error { return p.net.StopHandlingRequests() } +func GetDealByDataRef(ctx context.Context, r repo.StorageDealRepo, ref *types.ImportDataRef) (*types.MinerDeal, string, error) { + deal, err := r.GetDeal(ctx, ref.ProposalCID) + if err == nil { + return deal, ref.ProposalCID.String(), nil + } + deal, err = r.GetDealByUUID(ctx, ref.UUID) + return deal, ref.UUID.String(), err +} + // ImportDataForDeals manually batch imports data for offline storage deals func (p *StorageProviderImpl) ImportDataForDeals(ctx context.Context, refs []*types.ImportDataRef, skipCommP bool) ([]*types.ImportDataResult, error) { // TODO: be able to check if we have enough disk space results := make([]*types.ImportDataResult, 0, len(refs)) minerDeals := make(map[address.Address][]*types.MinerDeal) + targets := make(map[cid.Cid]string, len(refs)) for _, ref := range refs { - d, err := p.dealStore.GetDeal(ctx, ref.ProposalCID) + d, target, err := GetDealByDataRef(ctx, p.dealStore, ref) if err != nil { results = append(results, &types.ImportDataResult{ - ProposalCID: ref.ProposalCID, - Message: fmt.Errorf("failed getting deal: %v", err).Error(), + Target: target, + Message: fmt.Errorf("failed getting deal: %v", err).Error(), }) continue } if err := p.importDataForDeal(ctx, d, ref, skipCommP); err != nil { results = append(results, &types.ImportDataResult{ - ProposalCID: ref.ProposalCID, - Message: err.Error(), + Target: target, + Message: err.Error(), }) continue } + targets[d.ProposalCid] = target minerDeals[d.Proposal.Provider] = append(minerDeals[d.Proposal.Provider], d) } @@ -313,8 +324,8 @@ func (p *StorageProviderImpl) ImportDataForDeals(ctx context.Context, refs []*ty log.Errorf("batch reserver funds for %s failed: %v", provider, err) for _, deal := range deals { results = append(results, &types.ImportDataResult{ - ProposalCID: deal.ProposalCid, - Message: err.Error(), + Target: targets[deal.ProposalCid], + Message: err.Error(), }) } continue @@ -323,13 +334,13 @@ func (p *StorageProviderImpl) ImportDataForDeals(ctx context.Context, refs []*ty for _, deal := range deals { if err := res[deal.ProposalCid]; err != nil { results = append(results, &types.ImportDataResult{ - ProposalCID: deal.ProposalCid, - Message: err.Error(), + Target: targets[deal.ProposalCid], + Message: err.Error(), }) continue } results = append(results, &types.ImportDataResult{ - ProposalCID: deal.ProposalCid, + Target: targets[deal.ProposalCid], }) go func(deal *types.MinerDeal) {