Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: iterate all deals to index piece #1549

Merged
merged 3 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 50 additions & 35 deletions piecedirectory/piecedirectory.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,17 @@ func (ps *PieceDirectory) BuildIndexForPiece(ctx context.Context, pieceCid cid.C
return fmt.Errorf("getting piece deals: no deals found for piece")
}

err = ps.addIndexForPieceThrottled(ctx, pieceCid, dls[0])
if err != nil {
return fmt.Errorf("adding index for piece deal %d: %w", dls[0].ChainDealID, err)
var merr error

for _, dl := range dls {
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
err = ps.addIndexForPieceThrottled(ctx, pieceCid, dl)
if err == nil {
return nil
}
merr = multierror.Append(merr, fmt.Errorf("adding index for piece deal %d: %w", dl.ChainDealID, err))
}

return nil
return merr
}

func (ps *PieceDirectory) RemoveDealForPiece(ctx context.Context, pieceCid cid.Cid, dealUuid string) error {
Expand Down Expand Up @@ -427,44 +432,54 @@ func (ps *PieceDirectory) BlockstoreGetSize(ctx context.Context, c cid.Cid) (int
return 0, format.ErrNotFound{Cid: c}
}

// Get the size of the block from the first piece (should be the same for
// any piece)
offsetSize, err := ps.GetOffsetSize(ctx, pieces[0], c.Hash())
if err != nil {
return 0, fmt.Errorf("getting size of cid %s in piece %s: %w", c, pieces[0], err)
}
var merr error

if offsetSize.Size > 0 {
return int(offsetSize.Size), nil
}
for _, p := range pieces {
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
// Get the size of the block from the first piece (should be the same for
// any piece)
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
offsetSize, err := ps.GetOffsetSize(ctx, p, c.Hash())
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("getting size of cid %s in piece %s: %w", c, p, err))
continue
}

// Indexes imported from the DAG store do not have block size information
// (they only have offset information). Check if the block size is zero
// because the index is incomplete.
isComplete, err := ps.store.IsCompleteIndex(ctx, pieces[0])
if err != nil {
return 0, fmt.Errorf("getting index complete status for piece %s: %w", pieces[0], err)
}
if offsetSize.Size > 0 {
return int(offsetSize.Size), nil
}

if isComplete {
// The deal index is complete, so it must be a zero-sized block.
// A zero-sized block is unusual, but possible.
return int(offsetSize.Size), nil
}
// Indexes imported from the DAG store do not have block size information
// (they only have offset information). Check if the block size is zero
// because the index is incomplete.
isComplete, err := ps.store.IsCompleteIndex(ctx, p)
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("getting index complete status for piece %s: %w", p, err))
continue
}

// The index is incomplete, so re-build the index on the fly
err = ps.BuildIndexForPiece(ctx, pieces[0])
if err != nil {
return 0, fmt.Errorf("re-building index for piece %s: %w", pieces[0], err)
}
if isComplete {
// The deal index is complete, so it must be a zero-sized block.
// A zero-sized block is unusual, but possible.
return int(offsetSize.Size), nil
}

// Now get the size again
offsetSize, err = ps.GetOffsetSize(ctx, pieces[0], c.Hash())
if err != nil {
return 0, fmt.Errorf("getting size of cid %s in piece %s: %w", c, pieces[0], err)
// The index is incomplete, so re-build the index on the fly
err = ps.BuildIndexForPiece(ctx, p)
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("re-building index for piece %s: %w", p, err))
continue
}

// Now get the size again
offsetSize, err = ps.GetOffsetSize(ctx, p, c.Hash())
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("getting size of cid %s in piece %s: %w", c, p, err))
continue
}

return int(offsetSize.Size), nil
}

return int(offsetSize.Size), nil
return 0, merr
}

func (ps *PieceDirectory) BlockstoreHas(ctx context.Context, c cid.Cid) (bool, error) {
Expand Down
64 changes: 64 additions & 0 deletions piecedirectory/piecedirectory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"testing"
"time"

pdTypes "github.com/filecoin-project/boost/piecedirectory/types"
mock_piecedirectory "github.com/filecoin-project/boost/piecedirectory/types/mocks"
"github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/svc"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-state-types/abi"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -66,6 +68,10 @@ func testPieceDirectory(ctx context.Context, t *testing.T, bdsvc *svc.Service) {
t.Run("flagging pieces", func(t *testing.T) {
testFlaggingPieces(ctx, t, cl)
})

t.Run("reIndexing pieces from multiple sectors", func(t *testing.T) {
testReIndexMultiSector(ctx, t, cl)
})
}

func testPieceDirectoryNotFound(ctx context.Context, t *testing.T, cl *client.Store) {
Expand Down Expand Up @@ -329,3 +335,61 @@ func testFlaggingPieces(ctx context.Context, t *testing.T, cl *client.Store) {
require.NoError(t, err)
require.Equal(t, 0, len(pcids))
}

func testReIndexMultiSector(ctx context.Context, t *testing.T, cl *client.Store) {
ctrl := gomock.NewController(t)
pr := mock_piecedirectory.NewMockPieceReader(ctrl)
pm := NewPieceDirectory(cl, pr, 1)
pm.Start(ctx)

// Create a random CAR file
carFilePath := CreateCarFile(t)
carFile, err := os.Open(carFilePath)
require.NoError(t, err)
defer carFile.Close()

carReader, err := car.OpenReader(carFilePath)
require.NoError(t, err)
defer carReader.Close()
carv1Reader, err := carReader.DataReader()
require.NoError(t, err)

pr.EXPECT().GetReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("piece error")).Times(3)
pr.EXPECT().GetReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(_ context.Context, _ abi.SectorNumber, _ abi.PaddedPieceSize, _ abi.PaddedPieceSize) (pdTypes.SectionReader, error) {
_, err := carv1Reader.Seek(0, io.SeekStart)
return MockSectionReader{carv1Reader}, err
})

pieceCid := CalculateCommp(t, carv1Reader).PieceCID
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved

// Add deal info for the piece - it doesn't matter what it is, the piece
// just needs to have at least one deal associated with it
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
d1 := model.DealInfo{
DealUuid: uuid.New().String(),
ChainDealID: 1,
SectorID: 2,
PieceOffset: 0,
PieceLength: 0,
}

d2 := model.DealInfo{
DealUuid: uuid.New().String(),
ChainDealID: 2,
SectorID: 3,
PieceOffset: 0,
PieceLength: 0,
}

err = cl.AddDealForPiece(ctx, pieceCid, d1)
require.NoError(t, err)

err = cl.AddDealForPiece(ctx, pieceCid, d2)
require.NoError(t, err)

err = pm.BuildIndexForPiece(ctx, pieceCid)
require.ErrorContains(t, err, "piece error")

err = pm.BuildIndexForPiece(ctx, pieceCid)
require.NoError(t, err)
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
}