Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 56 additions & 35 deletions piecedirectory/piecedirectory.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid
return nil
}

// BuildIndexForPiece builds indexes for a given piece CID. The piece must contain a valid deal
// corresponding to an unsealed sector for this method to work. It will try to build index
// using all available deals and will exit as soon as it succeeds for one of the deals
func (ps *PieceDirectory) BuildIndexForPiece(ctx context.Context, pieceCid cid.Cid) error {
ctx, span := tracing.Tracer.Start(ctx, "pm.build_index_for_piece")
defer span.End()
Expand All @@ -251,12 +254,18 @@ func (ps *PieceDirectory) BuildIndexForPiece(ctx context.Context, pieceCid cid.C
return fmt.Errorf("getting piece deals: no deals found for piece")
}

err = ps.addIndexForPieceThrottled(ctx, pieceCid, dls[0])
if err != nil {
return fmt.Errorf("adding index for piece deal %d: %w", dls[0].ChainDealID, err)
var merr error

// Iterate over all available deals in case first deal does not have an unsealed sector
for _, dl := range dls {
err = ps.addIndexForPieceThrottled(ctx, pieceCid, dl)
if err == nil {
return nil
}
merr = multierror.Append(merr, fmt.Errorf("adding index for piece deal %d: %w", dl.ChainDealID, err))
}

return nil
return merr
}

func (ps *PieceDirectory) RemoveDealForPiece(ctx context.Context, pieceCid cid.Cid, dealUuid string) error {
Expand Down Expand Up @@ -427,44 +436,56 @@ func (ps *PieceDirectory) BlockstoreGetSize(ctx context.Context, c cid.Cid) (int
return 0, format.ErrNotFound{Cid: c}
}

// Get the size of the block from the first piece (should be the same for
// any piece)
offsetSize, err := ps.GetOffsetSize(ctx, pieces[0], c.Hash())
if err != nil {
return 0, fmt.Errorf("getting size of cid %s in piece %s: %w", c, pieces[0], err)
}
var merr error

if offsetSize.Size > 0 {
return int(offsetSize.Size), nil
}
// Iterate over all pieces in case the sector containing the first piece with the Block
// is not unsealed
for _, p := range pieces {
// Get the size of the block from the piece (should be the same for
// all pieces)
offsetSize, err := ps.GetOffsetSize(ctx, p, c.Hash())
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("getting size of cid %s in piece %s: %w", c, p, err))
continue
}

// Indexes imported from the DAG store do not have block size information
// (they only have offset information). Check if the block size is zero
// because the index is incomplete.
isComplete, err := ps.store.IsCompleteIndex(ctx, pieces[0])
if err != nil {
return 0, fmt.Errorf("getting index complete status for piece %s: %w", pieces[0], err)
}
if offsetSize.Size > 0 {
return int(offsetSize.Size), nil
}

if isComplete {
// The deal index is complete, so it must be a zero-sized block.
// A zero-sized block is unusual, but possible.
return int(offsetSize.Size), nil
}
// Indexes imported from the DAG store do not have block size information
// (they only have offset information). Check if the block size is zero
// because the index is incomplete.
isComplete, err := ps.store.IsCompleteIndex(ctx, p)
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("getting index complete status for piece %s: %w", p, err))
continue
}

// The index is incomplete, so re-build the index on the fly
err = ps.BuildIndexForPiece(ctx, pieces[0])
if err != nil {
return 0, fmt.Errorf("re-building index for piece %s: %w", pieces[0], err)
}
if isComplete {
// The deal index is complete, so it must be a zero-sized block.
// A zero-sized block is unusual, but possible.
return int(offsetSize.Size), nil
}

// Now get the size again
offsetSize, err = ps.GetOffsetSize(ctx, pieces[0], c.Hash())
if err != nil {
return 0, fmt.Errorf("getting size of cid %s in piece %s: %w", c, pieces[0], err)
// The index is incomplete, so re-build the index on the fly
err = ps.BuildIndexForPiece(ctx, p)
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("re-building index for piece %s: %w", p, err))
continue
}

// Now get the size again
offsetSize, err = ps.GetOffsetSize(ctx, p, c.Hash())
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("getting size of cid %s in piece %s: %w", c, p, err))
continue
}

return int(offsetSize.Size), nil
}

return int(offsetSize.Size), nil
return 0, merr
}

func (ps *PieceDirectory) BlockstoreHas(ctx context.Context, c cid.Cid) (bool, error) {
Expand Down
83 changes: 83 additions & 0 deletions piecedirectory/piecedirectory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"testing"
"time"

pdTypes "github.com/filecoin-project/boost/piecedirectory/types"
mock_piecedirectory "github.com/filecoin-project/boost/piecedirectory/types/mocks"
"github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/svc"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-state-types/abi"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -66,6 +68,10 @@ func testPieceDirectory(ctx context.Context, t *testing.T, bdsvc *svc.Service) {
t.Run("flagging pieces", func(t *testing.T) {
testFlaggingPieces(ctx, t, cl)
})

t.Run("reIndexing pieces from multiple sectors", func(t *testing.T) {
testReIndexMultiSector(ctx, t, cl)
})
}

func testPieceDirectoryNotFound(ctx context.Context, t *testing.T, cl *client.Store) {
Expand Down Expand Up @@ -329,3 +335,80 @@ func testFlaggingPieces(ctx context.Context, t *testing.T, cl *client.Store) {
require.NoError(t, err)
require.Equal(t, 0, len(pcids))
}

// Verify that BuildIndexForPiece iterates over all deals return error if none of the deals (sectors)
// can be used to read the piece. We are testing 2 conditions here:
// 1. No eligible piece is found for both deals - error is expected
// 2. 1 eligible piece is found - no error is expected
func testReIndexMultiSector(ctx context.Context, t *testing.T, cl *client.Store) {
ctrl := gomock.NewController(t)
pr := mock_piecedirectory.NewMockPieceReader(ctrl)
pm := NewPieceDirectory(cl, pr, 1)
pm.Start(ctx)

// Create a random CAR file
carFilePath := CreateCarFile(t)
carFile, err := os.Open(carFilePath)
require.NoError(t, err)
defer carFile.Close()

carReader, err := car.OpenReader(carFilePath)
require.NoError(t, err)
defer carReader.Close()
carv1Reader, err := carReader.DataReader()
require.NoError(t, err)

// Return error first 3 time as during the first attempt we want to surface errors from
// failed BuildIndexForPiece operation for both deals. 3rd time to return error for first deal
// in the second run where we want the method to succeed eventually.
pr.EXPECT().GetReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("piece error")).Times(3)
pr.EXPECT().GetReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(_ context.Context, _ abi.SectorNumber, _ abi.PaddedPieceSize, _ abi.PaddedPieceSize) (pdTypes.SectionReader, error) {
_, err := carv1Reader.Seek(0, io.SeekStart)
return MockSectionReader{carv1Reader}, err
})

pieceCid := CalculateCommp(t, carv1Reader).PieceCID

// Add deal info for the piece - it doesn't matter what it is, the piece
// just needs to have 2 deals. One with no available pieceReader (simulating no unsealed sector)
// and other one with correct pieceReader
d1 := model.DealInfo{
DealUuid: uuid.New().String(),
ChainDealID: 1,
SectorID: 2,
PieceOffset: 0,
PieceLength: 0,
}

d2 := model.DealInfo{
DealUuid: uuid.New().String(),
ChainDealID: 2,
SectorID: 3,
PieceOffset: 0,
PieceLength: 0,
}

err = cl.AddDealForPiece(ctx, pieceCid, d1)
require.NoError(t, err)

err = cl.AddDealForPiece(ctx, pieceCid, d2)
require.NoError(t, err)

b, err := cl.IsIndexed(ctx, pieceCid)
require.NoError(t, err)
require.False(t, b)

// Expect error as GetReader() mock will return error for both deals
err = pm.BuildIndexForPiece(ctx, pieceCid)
require.ErrorContains(t, err, "piece error")

// No error is expected as GetReader() mock will return error for first deal
// but correct reader for the second deal
err = pm.BuildIndexForPiece(ctx, pieceCid)
require.NoError(t, err)

b, err = cl.IsIndexed(ctx, pieceCid)
require.NoError(t, err)
require.True(t, b)
}