-
Notifications
You must be signed in to change notification settings - Fork 76
Piece Store #554
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Piece Store #554
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,295 @@ | ||
| package piecestore | ||
|
|
||
| import ( | ||
| "bufio" | ||
| "fmt" | ||
| "github.com/filecoin-project/go-state-types/abi" | ||
| "github.com/google/uuid" | ||
| "github.com/hashicorp/go-multierror" | ||
| "github.com/ipfs/go-cid" | ||
| bstore "github.com/ipfs/go-ipfs-blockstore" | ||
| "github.com/ipld/go-car/util" | ||
| "github.com/ipld/go-car/v2" | ||
| carv2 "github.com/ipld/go-car/v2" | ||
| "github.com/ipld/go-car/v2/blockstore" | ||
| carindex "github.com/ipld/go-car/v2/index" | ||
| mh "github.com/multiformats/go-multihash" | ||
| "io" | ||
| ) | ||
|
|
||
| type SectionReader interface { | ||
| io.Reader | ||
| io.ReaderAt | ||
| io.Seeker | ||
| } | ||
|
|
||
| type Sealer interface { | ||
| // GetReader returns a reader over a piece. If there is no unsealed copy, returns ErrSealed. | ||
| GetReader(id abi.SectorNumber, offset abi.PaddedPieceSize, length abi.PaddedPieceSize) (SectionReader, error) | ||
| } | ||
|
|
||
| // CarIndexes manages indexes of CAR files that map from multihash => offset | ||
| type CarIndexes interface { | ||
| Add(pieceCid cid.Cid, itidx carindex.IterableIndex) error | ||
| IsIndexed(pieceCid cid.Cid) bool | ||
| Index(pieceCid cid.Cid) (carindex.IterableIndex, error) | ||
| GetOffset(pieceCid cid.Cid, hash mh.Multihash) (uint64, error) | ||
| Delete(pieceCid cid.Cid) error | ||
| } | ||
|
|
||
| // MHToPieceIndex is used to get the pieces that contain a multihash. | ||
| // It has an index that maps from multihash => piece cids | ||
| type MHToPieceIndex interface { | ||
| Add(pieceCid cid.Cid, itidx carindex.IterableIndex) error | ||
| IsIndexed(pieceCid cid.Cid) bool | ||
| PiecesContaining(m mh.Multihash) ([]cid.Cid, error) | ||
| Delete(pieceCid cid.Cid) error | ||
| } | ||
|
|
||
| // DealStore keeps track of deals for each piece | ||
| type DealStore interface { | ||
| Add(pieceCid cid.Cid, dealInfo DealInfo) error | ||
| List(pieceCid cid.Cid) ([]DealInfo, error) | ||
| Delete(pieceCid cid.Cid, dealUuid uuid.UUID) (bool, error) | ||
| } | ||
|
|
||
| type PieceStore struct { | ||
| dealStore DealStore | ||
| carIndex CarIndexes | ||
| mhToPieceIndex MHToPieceIndex | ||
| sealer Sealer | ||
| } | ||
|
|
||
| // DealInfo is information about a single deal for a given piece | ||
| // PieceOffset | ||
| // v | ||
| // Sector [..........................] | ||
| // Piece ......[ ]...... | ||
| // CAR ......[ ]............ | ||
| type DealInfo struct { | ||
| DealUuid uuid.UUID | ||
| SectorID abi.SectorNumber | ||
| PieceOffset abi.PaddedPieceSize | ||
| PieceLength abi.PaddedPieceSize | ||
| // The size of the CAR file without zero-padding. | ||
| // This value may be zero if the size is unknown. | ||
| CarLength uint64 | ||
| } | ||
|
|
||
| // Get the list of deals (and the sector the data is in) for a particular piece | ||
| func (ps *PieceStore) GetPieceDeals(pieceCid cid.Cid) ([]DealInfo, error) { | ||
| deals, err := ps.dealStore.List(pieceCid) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("listing deals for piece %s: %w", pieceCid, err) | ||
| } | ||
|
|
||
| return deals, nil | ||
| } | ||
|
|
||
| func (ps *PieceStore) AddDealForPiece(pieceCid cid.Cid, dealInfo DealInfo) error { | ||
| // TODO: pass dealInfo to addIndexForPiece | ||
|
|
||
| // Perform indexing of piece | ||
| if err := ps.addIndexForPiece(pieceCid); err != nil { | ||
| return fmt.Errorf("adding index for piece %s: %w", pieceCid, err) | ||
| } | ||
|
|
||
| // Add deal to list of deals for this piece | ||
| if err := ps.dealStore.Add(pieceCid, dealInfo); err != nil { | ||
| return fmt.Errorf("saving deal %s to store: %w", dealInfo.DealUuid, err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (ps *PieceStore) addIndexForPiece(pieceCid cid.Cid) error { | ||
| // Check if the indexes have already been added | ||
| if ps.carIndex.IsIndexed(pieceCid) && ps.mhToPieceIndex.IsIndexed(pieceCid) { | ||
| return nil | ||
| } | ||
|
|
||
| // Get a reader over the piece data | ||
| reader, err := ps.GetPieceReader(pieceCid) | ||
| if err != nil { | ||
| return fmt.Errorf("getting piece reader for piece %s: %w", pieceCid, err) | ||
| } | ||
|
|
||
| // Get an index from the CAR file - works for both CARv1 and CARv2 | ||
| idx, err := car.ReadOrGenerateIndex(reader, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true)) | ||
| if err != nil { | ||
| return fmt.Errorf("generating index for piece %s: %w", pieceCid, err) | ||
| } | ||
|
|
||
| itidx, ok := idx.(carindex.IterableIndex) | ||
| if !ok { | ||
| return fmt.Errorf("index is not iterable for piece %s", pieceCid) | ||
| } | ||
|
|
||
| if !ps.carIndex.IsIndexed(pieceCid) { | ||
| // Add mh => offset index to store | ||
| if err := ps.carIndex.Add(pieceCid, itidx); err != nil { | ||
| return fmt.Errorf("adding CAR index for piece %s: %w", pieceCid, err) | ||
| } | ||
| } | ||
|
|
||
| if !ps.mhToPieceIndex.IsIndexed(pieceCid) { | ||
| // Add mh => piece index to store | ||
| if err := ps.mhToPieceIndex.Add(pieceCid, itidx); err != nil { | ||
| return fmt.Errorf("adding cid index for piece %s: %w", pieceCid, err) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (ps *PieceStore) DeleteDealForPiece(pieceCid cid.Cid, dealUuid uuid.UUID) error { | ||
| // Delete deal from list of deals for this piece | ||
| wasLast, err := ps.dealStore.Delete(pieceCid, dealUuid) | ||
| if err != nil { | ||
| return fmt.Errorf("deleting deal %s from store: %w", dealUuid, err) | ||
| } | ||
|
|
||
| if !wasLast { | ||
| return nil | ||
| } | ||
|
|
||
| // Remove piece indexes | ||
| if err := ps.deleteIndexForPiece(pieceCid); err != nil { | ||
| return fmt.Errorf("deleting index for piece %s: %w", pieceCid, err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (ps *PieceStore) deleteIndexForPiece(pieceCid cid.Cid) interface{} { | ||
| // TODO: Maybe mark for GC instead of deleting immediately | ||
|
|
||
| // Delete mh => offset index from store | ||
| err := ps.carIndex.Delete(pieceCid) | ||
| if err != nil { | ||
| err = fmt.Errorf("deleting CAR index for piece %s: %w", pieceCid, err) | ||
| } | ||
|
|
||
| // Delete mh => piece index from store | ||
| if mherr := ps.mhToPieceIndex.Delete(pieceCid); mherr != nil { | ||
| err = multierror.Append(fmt.Errorf("deleting cid index for piece %s: %w", pieceCid, mherr)) | ||
| } | ||
| return err | ||
| } | ||
|
|
||
| // Used internally, and also by HTTP retrieval | ||
| func (ps *PieceStore) GetPieceReader(pieceCid cid.Cid) (SectionReader, error) { | ||
| // Get all deals containing this piece | ||
| deals, err := ps.GetPieceDeals(pieceCid) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("getting piece deals: %w", err) | ||
| } | ||
|
|
||
| if len(deals) == 0 { | ||
| return nil, fmt.Errorf("no deals found for piece cid %s: %w", pieceCid, err) | ||
| } | ||
|
|
||
| // For each deal, try to read an unsealed copy of the data from the sector | ||
| // it is stored in | ||
| var merr error | ||
| for i, dl := range deals { | ||
| reader, err := ps.sealer.GetReader(dl.SectorID, dl.PieceOffset, dl.PieceLength) | ||
| if err != nil { | ||
| // TODO: log error | ||
| if i < 3 { | ||
| merr = multierror.Append(merr, err) | ||
| } | ||
| continue | ||
| } | ||
|
|
||
| return reader, nil | ||
| } | ||
|
|
||
| return nil, merr | ||
| } | ||
|
|
||
| // Get all pieces that contain a multihash (used when retrieving by payload CID) | ||
| func (ps *PieceStore) PiecesContainingMultihash(m mh.Multihash) ([]cid.Cid, error) { | ||
| return ps.mhToPieceIndex.PiecesContaining(m) | ||
| } | ||
|
|
||
| func (ps *PieceStore) GetIterableIndex(pieceCid cid.Cid) (carindex.IterableIndex, error) { | ||
| return ps.carIndex.Index(pieceCid) | ||
| } | ||
|
|
||
| // Get a block (used by Bitswap retrieval) | ||
| func (ps *PieceStore) GetBlock(c cid.Cid) ([]byte, error) { | ||
| // TODO: use caching to make this efficient for repeated Gets against the same piece | ||
|
|
||
| // Get the pieces that contain the cid | ||
| pieces, err := ps.PiecesContainingMultihash(c.Hash()) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("getting pieces containing cid %s: %w", c, err) | ||
| } | ||
| if len(pieces) == 0 { | ||
| return nil, fmt.Errorf("no pieces with cid %s found", c) | ||
| } | ||
|
|
||
| // Get a reader over one of the pieces and extract the block data | ||
| var merr error | ||
| for i, pieceCid := range pieces { | ||
| data, err := func() ([]byte, error) { | ||
| // Get a reader over the piece data | ||
| reader, err := ps.GetPieceReader(pieceCid) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("getting piece reader: %w", err) | ||
| } | ||
|
|
||
| // Get the offset of the block within the piece (CAR file) | ||
| offset, err := ps.carIndex.GetOffset(pieceCid, c.Hash()) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("getting offset for cid %s in piece %s: %w", c, pieceCid, err) | ||
| } | ||
|
|
||
| // Seek to the block offset | ||
| _, err = reader.Seek(int64(offset), io.SeekStart) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("seeking to offset %d in piece reader: %w", int64(offset), err) | ||
| } | ||
|
|
||
| // Read the block data | ||
| _, data, err := util.ReadNode(bufio.NewReader(reader)) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("reading data for block %s from reader for piece %s: %w", c, pieceCid, err) | ||
| } | ||
| return data, nil | ||
| }() | ||
| if err != nil { | ||
| if i < 3 { | ||
| merr = multierror.Append(merr, err) | ||
| } | ||
| continue | ||
| } | ||
| return data, nil | ||
| } | ||
|
|
||
| return nil, merr | ||
| } | ||
|
|
||
| // Get a blockstore over a piece (used by Graphsync retrieval) | ||
| func (ps *PieceStore) GetBlockstore(pieceCid cid.Cid) (bstore.Blockstore, error) { | ||
| // Get a reader over the piece | ||
| reader, err := ps.GetPieceReader(pieceCid) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("getting piece reader for piece %s: %w", pieceCid, err) | ||
| } | ||
|
|
||
| // Get an index for the piece | ||
| idx, err := ps.GetIterableIndex(pieceCid) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("getting index for piece %s: %w", pieceCid, err) | ||
| } | ||
|
|
||
| // Create a blockstore from the index and the piece reader | ||
| bs, err := blockstore.NewReadOnly(reader, idx, carv2.ZeroLengthSectionAsEOF(true)) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("creating blockstore for piece %s: %w", pieceCid, err) | ||
| } | ||
|
|
||
| return bs, nil | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.