Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions extern/boostd-data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Store struct {
UnflagPiece func(ctx context.Context, pieceCid cid.Cid, maddr address.Address) error
FlaggedPiecesList func(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount func(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error)
UntrackPiece func(ctx context.Context, pieceCid cid.Cid, maddr address.Address) error
}
closer jsonrpc.ClientCloser
dialOpts []jsonrpc.Option
Expand Down Expand Up @@ -197,3 +198,7 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiec
func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) {
return s.client.FlaggedPiecesCount(ctx, filter)
}

func (s *Store) UntrackPiece(ctx context.Context, pieceCid cid.Cid, maddr address.Address) error {
return s.client.UntrackPiece(ctx, pieceCid, maddr)
}
23 changes: 23 additions & 0 deletions extern/boostd-data/ldb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,3 +829,26 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error {

return err
}

func (s *Store) UntrackPiece(ctx context.Context, pieceCid cid.Cid, maddr address.Address) error {
log.Debugw("handle.untack-piece")

ctx, span := tracing.Tracer.Start(ctx, "store.untrack_pieces")
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.Endpoint, "ldb.untrack_pieces"))
stop := metrics.Timer(ctx, metrics.APIRequestDuration)
defer stop()

defer func(now time.Time) {
log.Debugw("handled.untack-piece", "took", time.Since(now).String())
}(time.Now())

stats.Record(s.ctx, metrics.FailureUntrackPieceCount.M(1))

// LEVELDB does not have a separate PieceTracker table
// All pieces to be checked are picked from the main table
// so there is no need to delete anything
return nil

}
6 changes: 6 additions & 0 deletions extern/boostd-data/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
SuccessUnflagPieceCount = stats.Int64("success_unflag_piece_count", "Counter of unflag piece success", stats.UnitDimensionless)
SuccessFlaggedPiecesListCount = stats.Int64("success_flagged_pieces_list_count", "Counter of flagged pieces list success", stats.UnitDimensionless)
SuccessFlaggedPiecesCountCount = stats.Int64("success_flagged_pieces_count_count", "Counter of flagged pieces count success", stats.UnitDimensionless)
SuccessUntrackPieceCount = stats.Int64("success_untrack_piece", "Counter of untrack piece success", stats.UnitDimensionless)
FailureAddDealForPieceCount = stats.Int64("failure_add_deal_for_piece_count", "Counter of add deal failure", stats.UnitDimensionless)
FailureAddIndexCount = stats.Int64("failure_add_index_count", "Counter of add index failure", stats.UnitDimensionless)
FailureIsIndexedCount = stats.Int64("failure_is_indexed_count", "Counter of is indexed failure", stats.UnitDimensionless)
Expand All @@ -72,6 +73,7 @@ var (
FailureUnflagPieceCount = stats.Int64("failure_unflag_piece_count", "Counter of unflag piece failure", stats.UnitDimensionless)
FailureFlaggedPiecesListCount = stats.Int64("failure_flagged_pieces_list_count", "Counter of flagged pieces list failure", stats.UnitDimensionless)
FailureFlaggedPiecesCountCount = stats.Int64("failure_flagged_pieces_count_count", "Counter of flagged pieces count failure", stats.UnitDimensionless)
FailureUntrackPieceCount = stats.Int64("failure_untrack_piece", "Counter of untrack piece failure", stats.UnitDimensionless)
)

var (
Expand Down Expand Up @@ -110,6 +112,7 @@ var (
SuccessUnflagPieceCountView = &view.View{Measure: SuccessUnflagPieceCount, Aggregation: view.Count()}
SuccessFlaggedPiecesListCountView = &view.View{Measure: SuccessFlaggedPiecesListCount, Aggregation: view.Count()}
SuccessFlaggedPiecesCountCountView = &view.View{Measure: SuccessFlaggedPiecesCountCount, Aggregation: view.Count()}
SuccessUntrackPieceCountView = &view.View{Measure: SuccessUntrackPieceCount, Aggregation: view.Count()}
FailureAddDealForPieceCountView = &view.View{Measure: FailureAddDealForPieceCount, Aggregation: view.Count()}
FailureAddIndexCountView = &view.View{Measure: FailureAddIndexCount, Aggregation: view.Count()}
FailureIsIndexedCountView = &view.View{Measure: FailureIsIndexedCount, Aggregation: view.Count()}
Expand All @@ -131,6 +134,7 @@ var (
FailureUnflagPieceCountView = &view.View{Measure: FailureUnflagPieceCount, Aggregation: view.Count()}
FailureFlaggedPiecesListCountView = &view.View{Measure: FailureFlaggedPiecesListCount, Aggregation: view.Count()}
FailureFlaggedPiecesCountCountView = &view.View{Measure: FailureFlaggedPiecesCountCount, Aggregation: view.Count()}
FailureUntrackPieceCountView = &view.View{Measure: FailureUntrackPieceCount, Aggregation: view.Count()}
)

// DefaultViews is an array of OpenCensus views for metric gathering purposes
Expand Down Expand Up @@ -159,6 +163,7 @@ var DefaultViews = func() []*view.View {
SuccessUnflagPieceCountView,
SuccessFlaggedPiecesListCountView,
SuccessFlaggedPiecesCountCountView,
SuccessUntrackPieceCountView,
FailureAddDealForPieceCountView,
FailureAddIndexCountView,
FailureIsIndexedCountView,
Expand All @@ -180,6 +185,7 @@ var DefaultViews = func() []*view.View {
FailureUnflagPieceCountView,
FailureFlaggedPiecesListCountView,
FailureFlaggedPiecesCountCountView,
FailureUntrackPieceCountView,
}
//views = append(views, blockstore.DefaultViews...)
views = append(views, rpcmetrics.DefaultViews...)
Expand Down
27 changes: 27 additions & 0 deletions extern/boostd-data/yugabyte/piecedoctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,30 @@ func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPie
failureMetrics = false
return count, nil
}

func (s *Store) UntrackPiece(ctx context.Context, pieceCid cid.Cid, maddr address.Address) error {
ctx, span := tracing.Tracer.Start(ctx, "store.untrack_piece")
span.SetAttributes(attribute.String("pieceCid", pieceCid.String()))
defer span.End()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Endpoint, "yb.untrack_piece"))
stop := metrics.Timer(ctx, metrics.APIRequestDuration)
defer stop()
failureMetrics := true
defer func() {
if failureMetrics {
stats.Record(s.ctx, metrics.FailureUntrackPieceCount.M(1))
} else {
stats.Record(s.ctx, metrics.SuccessUntrackPieceCount.M(1))
}
}()

qry := `DELETE FROM PieceTracker WHERE MinerAddr = $1 AND PieceCid = $2`
_, err := s.db.Exec(ctx, qry, maddr.String(), pieceCid.String())
if err != nil {
return fmt.Errorf("untracking piece %s %s: %w", maddr, pieceCid, err)
}

failureMetrics = false
return nil

}
14 changes: 6 additions & 8 deletions extern/boostd-data/yugabyte/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error {
return fmt.Errorf("removing indexes for piece %s: getting recs: %w", pieceCid, err)
}

// Delete from multihash -> piece cids index
// Delete from multihash -> piece cids index and PieceBlockOffsetSize
var eg errgroup.Group
for i := 0; i < s.settings.PayloadPiecesParallelism; i++ {
eg.Go(func() error {
Expand All @@ -914,6 +914,11 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error {
if err != nil {
return fmt.Errorf("deleting from PayloadToPieces: %w", err)
}
q = `DELETE FROM PieceBlockOffsetSize WHERE PayloadMultihash = ? AND PieceCid = ?`
err = s.session.Query(q, multihashBytes, pieceCid.Bytes()).Exec()
if err != nil {
return fmt.Errorf("deleting from PieceBlockOffsetSize: %w", err)
}
}
}

Expand All @@ -925,13 +930,6 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error {
return err
}

// Delete from piece offsets index
qry := `DELETE FROM PieceBlockOffsetSize WHERE PieceCid = ?`
err = s.session.Query(qry, pieceCid.Bytes()).WithContext(ctx).Exec()
if err != nil {
return fmt.Errorf("removing indexes for piece %s: deleting offset / size info: %w", pieceCid, err)
}

failureMetrics = false
return nil
}
58 changes: 55 additions & 3 deletions indexprovider/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@ import (

"github.com/filecoin-project/boost/lib/legacy"
"github.com/filecoin-project/boost/storagemarket/types/legacytypes"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
chainTypes "github.com/filecoin-project/lotus/chain/types"
"github.com/google/uuid"
cbor "github.com/ipfs/go-ipld-cbor"
"go.uber.org/fx"

"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -48,6 +55,8 @@ var log = logging.Logger("index-provider-wrapper")
type Wrapper struct {
enabled bool

full v1api.FullNode
miner address.Address
cfg *config.Boost
dealsDB *db.DealsDB
legacyProv legacy.LegacyDealManager
Expand All @@ -64,15 +73,15 @@ type Wrapper struct {
stop context.CancelFunc
}

func NewWrapper(cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, directDealsDB *db.DirectDealsDB, dealsDB *db.DealsDB,
func NewWrapper(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, directDealsDB *db.DirectDealsDB, dealsDB *db.DealsDB,
ssDB *db.SectorStateDB, legacyProv legacy.LegacyDealManager, prov provider.Interface,
piecedirectory *piecedirectory.PieceDirectory, ssm *sectorstatemgr.SectorStateMgr, meshCreator idxprov.MeshCreator, storageService lotus_modules.MinerStorageService) (*Wrapper, error) {
piecedirectory *piecedirectory.PieceDirectory, ssm *sectorstatemgr.SectorStateMgr, meshCreator idxprov.MeshCreator, storageService lotus_modules.MinerStorageService, full v1api.FullNode) (*Wrapper, error) {

return func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, directDealsDB *db.DirectDealsDB, dealsDB *db.DealsDB,
ssDB *db.SectorStateDB, legacyProv legacy.LegacyDealManager, prov provider.Interface,
piecedirectory *piecedirectory.PieceDirectory,
ssm *sectorstatemgr.SectorStateMgr,
meshCreator idxprov.MeshCreator, storageService lotus_modules.MinerStorageService) (*Wrapper, error) {
meshCreator idxprov.MeshCreator, storageService lotus_modules.MinerStorageService, full v1api.FullNode) (*Wrapper, error) {

_, isDisabled := prov.(*DisabledIndexProvider)

Expand All @@ -95,6 +104,8 @@ func NewWrapper(cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, r repo.Loc
bitswapEnabled: bitswapEnabled,
httpEnabled: httpEnabled,
ssm: ssm,
full: full,
miner: provAddr,
}
return w, nil
}
Expand Down Expand Up @@ -445,6 +456,29 @@ func (w *Wrapper) IndexerAnnounceAllDeals(ctx context.Context) error {
return errors.New("cannot announce all deals: index provider is disabled")
}

mActor, err := w.full.StateGetActor(ctx, w.miner, chainTypes.EmptyTSK)
if err != nil {
return fmt.Errorf("getting actor for the miner %s: %w", w.miner, err)
}

store := adt.WrapStore(ctx, cbor.NewCborStore(blockstore.NewAPIBlockstore(w.full)))
mas, err := miner.Load(store, mActor)
if err != nil {
return fmt.Errorf("loading miner actor state %s: %w", w.miner, err)
}
liveSectors, err := miner.AllPartSectors(mas, miner.Partition.LiveSectors)
if err != nil {
return fmt.Errorf("getting live sector sets for miner %s: %w", w.miner, err)
}
unProvenSectors, err := miner.AllPartSectors(mas, miner.Partition.UnprovenSectors)
if err != nil {
return fmt.Errorf("getting unproven sector sets for miner %s: %w", w.miner, err)
}
activeSectors, err := bitfield.MergeBitFields(liveSectors, unProvenSectors)
if err != nil {
return fmt.Errorf("merging bitfields to generate all sealed sectors on miner %s: %w", w.miner, err)
}

log.Info("announcing all legacy deals to Indexer")

legacyDeals, err := w.legacyProv.ListDeals()
Expand Down Expand Up @@ -476,6 +510,15 @@ func (w *Wrapper) IndexerAnnounceAllDeals(ctx context.Context) error {
continue
}

present, err := activeSectors.IsSet(uint64(d.SectorNumber))
if err != nil {
return fmt.Errorf("checking if bitfield is set: %w", err)
}

if !present {
continue
}

adCid, lerr := w.AnnounceLegcayDealToIndexer(ctx, d.ProposalCid)
if lerr != nil {
merr = multierror.Append(merr, lerr)
Expand Down Expand Up @@ -508,6 +551,15 @@ func (w *Wrapper) IndexerAnnounceAllDeals(ctx context.Context) error {
continue
}

present, err := activeSectors.IsSet(uint64(d.SectorID))
if err != nil {
return fmt.Errorf("checking if bitfield is set: %w", err)
}

if !present {
continue
}

if _, err := w.AnnounceBoostDeal(ctx, d); err != nil {
// don't log already advertised errors as errors - just skip them
if !errors.Is(err, provider.ErrAlreadyAdvertised) {
Expand Down
8 changes: 7 additions & 1 deletion lib/pdcleaner/pdcleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ func (p *pdcleaner) clean() {
// Run at start up
log.Infof("Starting LID clean up")
serr := p.CleanOnce(p.ctx)
log.Errorf("Failed to cleanup LID: %s", serr)
if serr != nil {
log.Errorf("Failed to cleanup LID: %s", serr)
}
log.Debugf("Finished cleaning up LID")

// Create a ticker with an hour tick
Expand Down Expand Up @@ -185,6 +187,7 @@ func (p *pdcleaner) CleanOnce(ctx context.Context) error {
}
return fmt.Errorf("cleaning up boost deal %s for piece %s: %s", deal.DealUuid.String(), deal.ClientDealProposal.Proposal.PieceCID.String(), err.Error())
}
log.Infof("removed deal for %s and deal ID %s", deal.ClientDealProposal.Proposal.PieceCID.String(), deal.DealUuid.String())
}
return nil
})
Expand Down Expand Up @@ -212,6 +215,7 @@ func (p *pdcleaner) CleanOnce(ctx context.Context) error {
}
return fmt.Errorf("cleaning up legacy deal %s for piece %s: %s", deal.ProposalCid.String(), deal.ClientDealProposal.Proposal.PieceCID.String(), err.Error())
}
log.Infof("removed legacy deal for %s and deal ID %s", deal.ClientDealProposal.Proposal.PieceCID.String(), deal.ProposalCid.String())
}
return nil
})
Expand Down Expand Up @@ -240,6 +244,7 @@ func (p *pdcleaner) CleanOnce(ctx context.Context) error {
}
return fmt.Errorf("cleaning up direct deal %s for piece %s: %s", deal.ID.String(), deal.PieceCID, err.Error())
}
log.Infof("removed direct deal for %s and deal ID %s", deal.PieceCID.String(), deal.ID.String())
}
return nil
})
Expand Down Expand Up @@ -299,6 +304,7 @@ func (p *pdcleaner) CleanOnce(ctx context.Context) error {
}
log.Errorf("cleaning up dangling deal %s for piece %s: %s", deal.DealUuid, piece, err.Error())
}
log.Infof("removed dangling deal for %s and deal ID %s", piece.String(), deal.DealUuid)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func ConfigBoost(cfg *config.Boost) Option {
Override(new(sealingpipeline.API), From(new(lotus_modules.MinerStorageService))),

Override(new(*sectorstatemgr.SectorStateMgr), sectorstatemgr.NewSectorStateMgr(cfg)),
Override(new(*indexprovider.Wrapper), indexprovider.NewWrapper(cfg)),
Override(new(*indexprovider.Wrapper), indexprovider.NewWrapper(walletMiner, cfg)),
Override(new(storedask.StoredAsk), storedask.NewStoredAsk(cfg)),

Override(new(legacy.LegacyDealManager), modules.NewLegacyDealsManager),
Expand Down
15 changes: 14 additions & 1 deletion piecedirectory/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/rand"
"strings"
"time"

"github.com/filecoin-project/boost/db"
Expand All @@ -13,11 +14,11 @@ import (
"github.com/filecoin-project/boost/sectorstatemgr"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
)

var doclog = logging.Logger("piecedoc")
Expand Down Expand Up @@ -122,6 +123,18 @@ func (d *Doctor) checkPiece(ctx context.Context, pieceCid cid.Cid, lu *sectorsta
// Check if piece belongs to an active sector
md, err := d.store.GetPieceMetadata(ctx, pieceCid)
if err != nil {
// If piece is not found then it should be unflagged and removed from future tracking
if strings.Contains(err.Error(), "not found") {
serr := d.store.UnflagPiece(ctx, pieceCid, d.maddr)
if serr != nil {
return fmt.Errorf("failed to unflag the missing piece %s: %w", pieceCid.String(), serr)
}
serr = d.store.UntrackPiece(ctx, pieceCid, d.maddr)
if serr != nil {
return fmt.Errorf("failed to delete piece from tracker table %s: %w", pieceCid.String(), serr)
}
return nil
}
return fmt.Errorf("failed to get piece %s from local index directory: %w", pieceCid, err)
}

Expand Down