Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update unseal api / 更新 unseal 的接口 #314

Merged
merged 5 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 17 additions & 33 deletions dagstore/market_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,52 +85,36 @@ func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, err
for _, deal := range deals {
deal := deal

var isUnsealed bool
// Throttle this path to avoid flooding the storage subsystem.
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
// todo ProofType can not be passed, SP processes itself?
isUnsealed, err = m.gatewayMarketClient.IsUnsealed(ctx, deal.Proposal.Provider, pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize)

// send SectorsUnsealPiece task
wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize))
if err != nil {
return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorNumber, deal.DealID, err)
return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err)
}

if isUnsealed {
// send SectorsUnsealPiece task
wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize))
if err != nil {
return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err)
}

pieceTransfer, err := wps.GetPieceTransfer(ctx, pieceCid.String())
if err != nil {
return fmt.Errorf("get piece transfer for %s: %w", pieceCid, err)
}

return m.gatewayMarketClient.SectorsUnsealPiece(
ctx,
deal.Proposal.Provider,
pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize,
pieceTransfer,
)
pieceTransfer, err := wps.GetPieceTransfer(ctx, pieceCid.String())
if err != nil {
return fmt.Errorf("get piece transfer for %s: %w", pieceCid, err)
}

return nil
return m.gatewayMarketClient.SectorsUnsealPiece(
ctx,
deal.Proposal.Provider,
pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize,
pieceTransfer,
)
})

if err != nil {
log.Warnf("failed to check/retrieve unsealed sector: %s", err)
continue // move on to the next match.
}

if isUnsealed {
return true, nil
}
return true, nil
}

// we don't have an unsealed sector containing the piece
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.10.2-0.20230329075658-98ec86740c07
github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e
github.com/filecoin-project/venus-messager v1.10.2-0.20230309071456-7cd8d49c6e9a
github.com/golang/mock v1.6.0
Expand All @@ -40,7 +40,7 @@ require (
github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef
github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230331030234-365136f176ef
github.com/ipfs/go-blockservice v0.5.0
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-cidutil v0.1.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,8 @@ github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893
github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0=
github.com/filecoin-project/venus v1.10.2-0.20230329075658-98ec86740c07 h1:o7olPXy9+ah3PfzkIASK2qTtPoeKSXBjeb7szJZ55vc=
github.com/filecoin-project/venus v1.10.2-0.20230329075658-98ec86740c07/go.mod h1:eCV4+qsHdDg7FXB8xLn5w/ay+Uu5pG3oAlPsB1nb6qU=
github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1 h1:wOzhaoLA4AmfHa2ynT/KMf5GM53SSN41X8n0T+alVNw=
github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1/go.mod h1:eCV4+qsHdDg7FXB8xLn5w/ay+Uu5pG3oAlPsB1nb6qU=
github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU=
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e h1:Bxpt1AzPeNxmUnFT2Y8rpabr9x0wIC0Q87DeRmjL2co=
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs=
Expand Down Expand Up @@ -834,8 +834,8 @@ github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea/go.
github.com/ipfs-force-community/venus-common-utils v0.0.0-20210924063144-1d3a5b30de87/go.mod h1:RTVEOzM+hkpqmcEWpyLDkx1oGO5r9ZWCgYxG/CsXzJQ=
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 h1:v/1/INcqm3kHLauWQYB63MwWJRWGz+3WEuUPp0jzIl8=
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7/go.mod h1:sSTUXgIu95tPHvgcYhdLuELmgPJWCP/pNMFtsrVtOyA=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356 h1:j+EdBUhTFZgQBoC+AQuucDlIGpdvRvjWZmtn3GIuMsU=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356/go.mod h1:J2VCU6ANymkl8MjjpHj+y2Wai7EGM1Htd6ImSD5Entw=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230331030234-365136f176ef h1:hDvFmxoriRcJS6YJxm04ME1Hjuv3eedoUxnuX/cWLPA=
github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230331030234-365136f176ef/go.mod h1:s9bSNvoFlOWH99ofrSPMn9+vcSEQZG7Cq1hiaPFA11M=
github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
Expand Down
18 changes: 5 additions & 13 deletions piecestorage/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package piecestorage

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -91,22 +90,15 @@ func (f *fsPieceStorage) GetRedirectUrl(_ context.Context, _ string) (string, er
return "", ErrUnsupportRedirect
}

func (f *fsPieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (*market.Transfer, error) {
func (f *fsPieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (string, error) {
if f.fsCfg.ReadOnly {
return nil, fmt.Errorf("%s id readonly piece store", f.fsCfg.Name)
return "", fmt.Errorf("%s id readonly piece store", f.fsCfg.Name)
}

dstPath := path.Join(f.baseUrl, pieceCid)
transfer := market.FsTransfer{Path: dstPath}
params, err := json.Marshal(&transfer)
if err != nil {
return nil, fmt.Errorf("construct piece transfer: %w", err)
}
// url example: http://market/resource?resource-id=xxx&store=xxx
url := fmt.Sprintf("/resource?resource-id=%s&store=%s", pieceCid, f.fsCfg.Name)

return &market.Transfer{
Type: market.PiecesTransferFs,
Params: params,
}, nil
return url, nil
}

func (f *fsPieceStorage) Has(_ context.Context, resourceId string) (bool, error) {
Expand Down
4 changes: 2 additions & 2 deletions piecestorage/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func (m *MemPieceStore) GetRedirectUrl(_ context.Context, resourceId string) (st
return "", ErrUnsupportRedirect
}

func (m *MemPieceStore) GetPieceTransfer(context.Context, string) (*market.Transfer, error) {
return &market.Transfer{}, nil
func (m *MemPieceStore) GetPieceTransfer(context.Context, string) (string, error) {
return "", nil
}

func (m *MemPieceStore) Validate(s string) error {
Expand Down
33 changes: 14 additions & 19 deletions piecestorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package piecestorage

import (
"context"
"encoding/json"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -206,31 +205,27 @@ func (s *s3PieceStorage) GetRedirectUrl(ctx context.Context, resourceId string)
return req.Presign(time.Hour * 24)
}

func (s *s3PieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (*market.Transfer, error) {
if s.s3Cfg.ReadOnly {
return nil, fmt.Errorf("%s id readonly piece store", s.s3Cfg.Name)
func (s *s3PieceStorage) GetPutObjectUrl(ctx context.Context, resourceId string) (string, error) {
params := &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.subdirWrapper(resourceId)),
}

transfer := market.S3Transfer{
EndPoint: s.s3Cfg.EndPoint,
Bucket: s.s3Cfg.Bucket,
SubDir: s.s3Cfg.SubDir,

AccessKey: s.s3Cfg.AccessKey,
SecretKey: s.s3Cfg.SecretKey,
Token: s.s3Cfg.Token,
req, _ := s.s3Client.PutObjectRequest(params)
return req.Presign(time.Hour * 24)
}

Key: pieceCid,
func (s *s3PieceStorage) GetPieceTransfer(ctx context.Context, pieceCid string) (string, error) {
if s.s3Cfg.ReadOnly {
return "", fmt.Errorf("%s is readonly piece store", s.s3Cfg.Name)
}
params, err := json.Marshal(&transfer)

url, err := s.GetPutObjectUrl(ctx, pieceCid)
if err != nil {
return nil, fmt.Errorf("construct piece transfer: %w", err)
return "", err
}

return &market.Transfer{
Type: market.PiecesTransferS3,
Params: params,
}, nil
return url, nil
}

func (s *s3PieceStorage) GetStorageStatus() (market.StorageStatus, error) {
Expand Down
2 changes: 1 addition & 1 deletion piecestorage/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ type IPieceStorage interface {
Has(context.Context, string) (bool, error)
Validate(string) error
GetStorageStatus() (market.StorageStatus, error)
GetPieceTransfer(context.Context, string) (*market.Transfer, error)
GetPieceTransfer(context.Context, string) (string, error)
}
64 changes: 63 additions & 1 deletion rpc/piece_storage_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,19 @@ func NewPieceStorageServer(pieceStorageMgr *piecestorage.PieceStorageManager) *P
}

func (p *PieceStorageServer) ServeHTTP(res http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
switch req.Method {
case http.MethodGet:
p.handleGet(res, req)
case http.MethodPut:
p.handlePut(res, req)
default:
// handle error
logErrorAndResonse(res, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}
}

func (p *PieceStorageServer) handleGet(res http.ResponseWriter, req *http.Request) {
resourceID := req.URL.Query().Get("resource-id")
if len(resourceID) == 0 {
logErrorAndResonse(res, "resource is empty", http.StatusBadRequest)
Expand Down Expand Up @@ -79,6 +87,60 @@ func (p *PieceStorageServer) ServeHTTP(res http.ResponseWriter, req *http.Reques
_, _ = io.Copy(res, r)
}

// handlePut save resource to piece storage
// url example: http://market/resource?resource-id=xxx&store=xxx or http://market/resource?resource-id=xxx&size=xxx
func (p *PieceStorageServer) handlePut(res http.ResponseWriter, req *http.Request) {
ctx := req.Context()
resourceID := req.URL.Query().Get("resource-id")
if len(resourceID) == 0 {
logErrorAndResonse(res, "resource is empty", http.StatusBadRequest)
return
}

if req.Body == nil {
logErrorAndResonse(res, "body is empty", http.StatusBadRequest)
return
}

if !req.URL.Query().Has("store") && !req.URL.Query().Has("size") {
logErrorAndResonse(res, "both store and size is empty", http.StatusBadRequest)
return
}

var store piecestorage.IPieceStorage
if req.URL.Query().Has("store") {
storeName := req.URL.Query().Get("store")

var err error
store, err = p.pieceStorageMgr.GetPieceStorageByName(storeName)
if err != nil {
logErrorAndResonse(res, fmt.Sprintf("fail to get store %s: %s", storeName, err), http.StatusInternalServerError)
return
}
}
if store == nil && req.URL.Query().Has("size") {
sizeStr := req.URL.Query().Get("size")
size, err := strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
logErrorAndResonse(res, fmt.Sprintf("size %s is invalid", sizeStr), http.StatusBadRequest)
return
}
store, err = p.pieceStorageMgr.FindStorageForWrite(size)
if err != nil {
logErrorAndResonse(res, fmt.Sprintf("fail to find store for write: %s", err), http.StatusInternalServerError)
return
}
}

_, err := store.SaveTo(ctx, resourceID, req.Body)
simlecode marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logErrorAndResonse(res, fmt.Sprintf("fail to save resource %s to store %s: %s", resourceID, store.GetName(), err), http.StatusInternalServerError)
return
}

res.WriteHeader(http.StatusOK)
}

func logErrorAndResonse(res http.ResponseWriter, err string, code int) {
resourceLog.Errorf("resource request fail Code: %d, Message: %s", code, err)
http.Error(res, err, code)
Expand Down