diff --git a/dagstore/market_api.go b/dagstore/market_api.go index 91cacd84..fa2e4106 100644 --- a/dagstore/market_api.go +++ b/dagstore/market_api.go @@ -85,52 +85,36 @@ func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, err for _, deal := range deals { deal := deal - var isUnsealed bool // Throttle this path to avoid flooding the storage subsystem. err := m.throttle.Do(ctx, func(ctx context.Context) (err error) { - // todo ProofType can not be passed, SP processes itself? - isUnsealed, err = m.gatewayMarketClient.IsUnsealed(ctx, deal.Proposal.Provider, pieceCid, - deal.SectorNumber, - vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()), - deal.Proposal.PieceSize) + + // send SectorsUnsealPiece task + wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize)) if err != nil { - return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorNumber, deal.DealID, err) + return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err) } - if isUnsealed { - // send SectorsUnsealPiece task - wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize)) - if err != nil { - return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err) - } - - pieceTransfer, err := wps.GetPieceTransfer(ctx, pieceCid.String()) - if err != nil { - return fmt.Errorf("get piece transfer for %s: %w", pieceCid, err) - } - - return m.gatewayMarketClient.SectorsUnsealPiece( - ctx, - deal.Proposal.Provider, - pieceCid, - deal.SectorNumber, - vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()), - deal.Proposal.PieceSize, - pieceTransfer, - ) + pieceTransfer, err := wps.GetPieceTransfer(ctx, pieceCid.String()) + if err != nil { + return fmt.Errorf("get piece transfer for %s: %w", pieceCid, err) } - return nil + return m.gatewayMarketClient.SectorsUnsealPiece( + ctx, + deal.Proposal.Provider, + pieceCid, + deal.SectorNumber, + vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()), + deal.Proposal.PieceSize, + pieceTransfer, + ) }) if err != nil { log.Warnf("failed to check/retrieve unsealed sector: %s", err) continue // move on to the next match. } - - if isUnsealed { - return true, nil - } + return true, nil } // we don't have an unsealed sector containing the piece diff --git a/go.mod b/go.mod index 5af0c9cc..3d6755b4 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/filecoin-project/go-statestore v0.2.0 github.com/filecoin-project/specs-actors/v2 v2.3.6 github.com/filecoin-project/specs-actors/v7 v7.0.1 - github.com/filecoin-project/venus v1.10.2-0.20230329075658-98ec86740c07 + github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1 github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e github.com/filecoin-project/venus-messager v1.10.2-0.20230309071456-7cd8d49c6e9a github.com/golang/mock v1.6.0 @@ -40,7 +40,7 @@ require ( github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 - github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356 + github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230331030234-365136f176ef github.com/ipfs/go-blockservice v0.5.0 github.com/ipfs/go-cid v0.3.2 github.com/ipfs/go-cidutil v0.1.0 diff --git a/go.sum b/go.sum index 2a791564..f07969d1 100644 --- a/go.sum +++ b/go.sum @@ -468,8 +468,8 @@ github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893 github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8= github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E= github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0= -github.com/filecoin-project/venus v1.10.2-0.20230329075658-98ec86740c07 h1:o7olPXy9+ah3PfzkIASK2qTtPoeKSXBjeb7szJZ55vc= -github.com/filecoin-project/venus v1.10.2-0.20230329075658-98ec86740c07/go.mod h1:eCV4+qsHdDg7FXB8xLn5w/ay+Uu5pG3oAlPsB1nb6qU= +github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1 h1:wOzhaoLA4AmfHa2ynT/KMf5GM53SSN41X8n0T+alVNw= +github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1/go.mod h1:eCV4+qsHdDg7FXB8xLn5w/ay+Uu5pG3oAlPsB1nb6qU= github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU= github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e h1:Bxpt1AzPeNxmUnFT2Y8rpabr9x0wIC0Q87DeRmjL2co= github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs= @@ -834,8 +834,8 @@ github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea/go. github.com/ipfs-force-community/venus-common-utils v0.0.0-20210924063144-1d3a5b30de87/go.mod h1:RTVEOzM+hkpqmcEWpyLDkx1oGO5r9ZWCgYxG/CsXzJQ= github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 h1:v/1/INcqm3kHLauWQYB63MwWJRWGz+3WEuUPp0jzIl8= github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7/go.mod h1:sSTUXgIu95tPHvgcYhdLuELmgPJWCP/pNMFtsrVtOyA= -github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356 h1:j+EdBUhTFZgQBoC+AQuucDlIGpdvRvjWZmtn3GIuMsU= -github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230320070449-17b514ccd356/go.mod h1:J2VCU6ANymkl8MjjpHj+y2Wai7EGM1Htd6ImSD5Entw= +github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230331030234-365136f176ef h1:hDvFmxoriRcJS6YJxm04ME1Hjuv3eedoUxnuX/cWLPA= +github.com/ipfs-force-community/venus-gateway v1.10.2-0.20230331030234-365136f176ef/go.mod h1:s9bSNvoFlOWH99ofrSPMn9+vcSEQZG7Cq1hiaPFA11M= github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= diff --git a/piecestorage/filestore.go b/piecestorage/filestore.go index acc49f75..cff96862 100644 --- a/piecestorage/filestore.go +++ b/piecestorage/filestore.go @@ -2,7 +2,6 @@ package piecestorage import ( "context" - "encoding/json" "fmt" "io" "io/ioutil" @@ -91,22 +90,15 @@ func (f *fsPieceStorage) GetRedirectUrl(_ context.Context, _ string) (string, er return "", ErrUnsupportRedirect } -func (f *fsPieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (*market.Transfer, error) { +func (f *fsPieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (string, error) { if f.fsCfg.ReadOnly { - return nil, fmt.Errorf("%s id readonly piece store", f.fsCfg.Name) + return "", fmt.Errorf("%s id readonly piece store", f.fsCfg.Name) } - dstPath := path.Join(f.baseUrl, pieceCid) - transfer := market.FsTransfer{Path: dstPath} - params, err := json.Marshal(&transfer) - if err != nil { - return nil, fmt.Errorf("construct piece transfer: %w", err) - } + // url example: http://market/resource?resource-id=xxx&store=xxx + url := fmt.Sprintf("/resource?resource-id=%s&store=%s", pieceCid, f.fsCfg.Name) - return &market.Transfer{ - Type: market.PiecesTransferFs, - Params: params, - }, nil + return url, nil } func (f *fsPieceStorage) Has(_ context.Context, resourceId string) (bool, error) { diff --git a/piecestorage/memstore.go b/piecestorage/memstore.go index db022735..b4ae6c01 100644 --- a/piecestorage/memstore.go +++ b/piecestorage/memstore.go @@ -109,8 +109,8 @@ func (m *MemPieceStore) GetRedirectUrl(_ context.Context, resourceId string) (st return "", ErrUnsupportRedirect } -func (m *MemPieceStore) GetPieceTransfer(context.Context, string) (*market.Transfer, error) { - return &market.Transfer{}, nil +func (m *MemPieceStore) GetPieceTransfer(context.Context, string) (string, error) { + return "", nil } func (m *MemPieceStore) Validate(s string) error { diff --git a/piecestorage/s3.go b/piecestorage/s3.go index 9f7cdd82..45feb31e 100644 --- a/piecestorage/s3.go +++ b/piecestorage/s3.go @@ -2,7 +2,6 @@ package piecestorage import ( "context" - "encoding/json" "fmt" "io" "math" @@ -206,31 +205,27 @@ func (s *s3PieceStorage) GetRedirectUrl(ctx context.Context, resourceId string) return req.Presign(time.Hour * 24) } -func (s *s3PieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (*market.Transfer, error) { - if s.s3Cfg.ReadOnly { - return nil, fmt.Errorf("%s id readonly piece store", s.s3Cfg.Name) +func (s *s3PieceStorage) GetPutObjectUrl(ctx context.Context, resourceId string) (string, error) { + params := &s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(s.subdirWrapper(resourceId)), } - transfer := market.S3Transfer{ - EndPoint: s.s3Cfg.EndPoint, - Bucket: s.s3Cfg.Bucket, - SubDir: s.s3Cfg.SubDir, - - AccessKey: s.s3Cfg.AccessKey, - SecretKey: s.s3Cfg.SecretKey, - Token: s.s3Cfg.Token, + req, _ := s.s3Client.PutObjectRequest(params) + return req.Presign(time.Hour * 24) +} - Key: pieceCid, +func (s *s3PieceStorage) GetPieceTransfer(ctx context.Context, pieceCid string) (string, error) { + if s.s3Cfg.ReadOnly { + return "", fmt.Errorf("%s is readonly piece store", s.s3Cfg.Name) } - params, err := json.Marshal(&transfer) + + url, err := s.GetPutObjectUrl(ctx, pieceCid) if err != nil { - return nil, fmt.Errorf("construct piece transfer: %w", err) + return "", err } - return &market.Transfer{ - Type: market.PiecesTransferS3, - Params: params, - }, nil + return url, nil } func (s *s3PieceStorage) GetStorageStatus() (market.StorageStatus, error) { diff --git a/piecestorage/type.go b/piecestorage/type.go index e95bfba4..10551865 100644 --- a/piecestorage/type.go +++ b/piecestorage/type.go @@ -38,5 +38,5 @@ type IPieceStorage interface { Has(context.Context, string) (bool, error) Validate(string) error GetStorageStatus() (market.StorageStatus, error) - GetPieceTransfer(context.Context, string) (*market.Transfer, error) + GetPieceTransfer(context.Context, string) (string, error) } diff --git a/rpc/piece_storage_server.go b/rpc/piece_storage_server.go index f2a8889f..6a9eb32c 100644 --- a/rpc/piece_storage_server.go +++ b/rpc/piece_storage_server.go @@ -23,11 +23,19 @@ func NewPieceStorageServer(pieceStorageMgr *piecestorage.PieceStorageManager) *P } func (p *PieceStorageServer) ServeHTTP(res http.ResponseWriter, req *http.Request) { - if req.Method != http.MethodGet { + switch req.Method { + case http.MethodGet: + p.handleGet(res, req) + case http.MethodPut: + p.handlePut(res, req) + default: + // handle error logErrorAndResonse(res, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) return } +} +func (p *PieceStorageServer) handleGet(res http.ResponseWriter, req *http.Request) { resourceID := req.URL.Query().Get("resource-id") if len(resourceID) == 0 { logErrorAndResonse(res, "resource is empty", http.StatusBadRequest) @@ -79,6 +87,60 @@ func (p *PieceStorageServer) ServeHTTP(res http.ResponseWriter, req *http.Reques _, _ = io.Copy(res, r) } +// handlePut save resource to piece storage +// url example: http://market/resource?resource-id=xxx&store=xxx or http://market/resource?resource-id=xxx&size=xxx +func (p *PieceStorageServer) handlePut(res http.ResponseWriter, req *http.Request) { + ctx := req.Context() + resourceID := req.URL.Query().Get("resource-id") + if len(resourceID) == 0 { + logErrorAndResonse(res, "resource is empty", http.StatusBadRequest) + return + } + + if req.Body == nil { + logErrorAndResonse(res, "body is empty", http.StatusBadRequest) + return + } + + if !req.URL.Query().Has("store") && !req.URL.Query().Has("size") { + logErrorAndResonse(res, "both store and size is empty", http.StatusBadRequest) + return + } + + var store piecestorage.IPieceStorage + if req.URL.Query().Has("store") { + storeName := req.URL.Query().Get("store") + + var err error + store, err = p.pieceStorageMgr.GetPieceStorageByName(storeName) + if err != nil { + logErrorAndResonse(res, fmt.Sprintf("fail to get store %s: %s", storeName, err), http.StatusInternalServerError) + return + } + } + if store == nil && req.URL.Query().Has("size") { + sizeStr := req.URL.Query().Get("size") + size, err := strconv.ParseInt(sizeStr, 10, 64) + if err != nil { + logErrorAndResonse(res, fmt.Sprintf("size %s is invalid", sizeStr), http.StatusBadRequest) + return + } + store, err = p.pieceStorageMgr.FindStorageForWrite(size) + if err != nil { + logErrorAndResonse(res, fmt.Sprintf("fail to find store for write: %s", err), http.StatusInternalServerError) + return + } + } + + _, err := store.SaveTo(ctx, resourceID, req.Body) + if err != nil { + logErrorAndResonse(res, fmt.Sprintf("fail to save resource %s to store %s: %s", resourceID, store.GetName(), err), http.StatusInternalServerError) + return + } + + res.WriteHeader(http.StatusOK) +} + func logErrorAndResonse(res http.ResponseWriter, err string, code int) { resourceLog.Errorf("resource request fail Code: %d, Message: %s", code, err) http.Error(res, err, code)