Skip to content

Commit

Permalink
feat: monitor sectors unsealed state (filecoin-project#1191)
Browse files Browse the repository at this point in the history
* feat: monitor sectors unsealed state

* feat: announce deals to indexer as newly unsealed or no longer unsealed

* feat: test for unsealed state manager

* feat: update metadata if seal state changes, call NotifyRemove if sector is removed

* feat: better logging

* feat: check sector seal state once per hour

* feat: act on all deals in a sector

* feat: announce sealing state changes for legacy deals also

* fix: gfm type

* chore: fix merge miss

* feat: redeclare storage when unsealed state is checked (filecoin-project#1377)

* feat: redeclare storage when unsealed state is checked

This is currently needed as lotus doesnt automatically update when the fs changes

* chore: fix linting

* feat: make unsealedstatemanager configurable

*Update default duration to 12hours from 1hour

*Allow users to disabl redeclaration of storage

chore: make gen

* refactor: move storage list to 1hour interval

---------

Co-authored-by: Jacob Heun <jacob.heun@gmail.com>
Co-authored-by: Jacob Heun <jacobheun@gmail.com>
  • Loading branch information
3 people authored Apr 17, 2023
1 parent f6282d9 commit 6b246d2
Show file tree
Hide file tree
Showing 15 changed files with 1,105 additions and 43 deletions.
2 changes: 2 additions & 0 deletions cmd/boostd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func before(cctx *cli.Context) error {
_ = logging.SetLogLevel("cfg", "INFO")
_ = logging.SetLogLevel("boost-storage-deal", "INFO")
_ = logging.SetLogLevel("index-provider-wrapper", "INFO")
_ = logging.SetLogLevel("unsmgr", "INFO")

if cliutil.IsVeryVerbose {
_ = logging.SetLogLevel("boostd", "DEBUG")
Expand All @@ -78,6 +79,7 @@ func before(cctx *cli.Context) error {
_ = logging.SetLogLevel("boost-migrator", "DEBUG")
_ = logging.SetLogLevel("dagstore", "DEBUG")
_ = logging.SetLogLevel("migrator", "DEBUG")
_ = logging.SetLogLevel("unsmgr", "DEBUG")
}

return nil
Expand Down
11 changes: 11 additions & 0 deletions db/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/filecoin-project/boost/db/fielddef"
"github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v9/market"
"github.com/google/uuid"
"github.com/graph-gophers/graphql-go"
Expand Down Expand Up @@ -237,6 +239,15 @@ func (d *DealsDB) BySignedProposalCID(ctx context.Context, proposalCid cid.Cid)
return d.scanRow(row)
}

func (d *DealsDB) BySectorID(ctx context.Context, sectorID abi.SectorID) ([]*types.ProviderDealState, error) {
addr, err := address.NewIDAddress(uint64(sectorID.Miner))
if err != nil {
return nil, fmt.Errorf("creating address from ID %d: %w", sectorID.Miner, err)
}

return d.list(ctx, 0, 0, "ProviderAddress=? AND SectorID=?", addr.String(), sectorID.Number)
}

func (d *DealsDB) Count(ctx context.Context, query string, filter *FilterOptions) (int, error) {
whereArgs := []interface{}{}
where := "SELECT count(*) FROM Deals"
Expand Down
6 changes: 3 additions & 3 deletions db/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func GenerateDeals() ([]types.ProviderDealState, error) {
}

func GenerateNDeals(count int) ([]types.ProviderDealState, error) {
provAddr, err := address.NewActorAddress([]byte("f1523"))
provAddr, err := address.NewIDAddress(1523)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -93,8 +93,8 @@ func GenerateNDeals(count int) ([]types.ProviderDealState, error) {
Checkpoint: dealcheckpoints.Accepted,
Retry: types.DealRetryAuto,
Err: dealErr,
FastRetrieval: false,
AnnounceToIPNI: false,
FastRetrieval: true,
AnnounceToIPNI: true,
}

deals = append(deals, deal)
Expand Down
17 changes: 17 additions & 0 deletions db/migrations/20230217114757_create_sealing_status.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- +goose Up
-- +goose StatementBegin
CREATE TABLE IF NOT EXISTS SectorState (
MinerID INT,
SectorID INT,
UpdatedAt DateTime,
SealState TEXT
);

CREATE INDEX IF NOT EXISTS index_sector_state_sector_id on SectorState(SectorID);
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP INDEX IF EXISTS index_sector_state_sector_id;
DROP TABLE IF EXISTS SectorState;
-- +goose StatementEnd
8 changes: 5 additions & 3 deletions db/migrations_tests/deals_announce_to_ipni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,23 @@ func TestDealAnnounceToIPNI(t *testing.T) {

// Insert the deal into the DB
deal := deals[0]
deal.AnnounceToIPNI = false

_, err = sqldb.Exec(`INSERT INTO Deals ("ID", "CreatedAt", "DealProposalSignature", "PieceCID", "PieceSize",
"VerifiedDeal", "IsOffline", "ClientAddress", "ProviderAddress","Label", "StartEpoch", "EndEpoch",
"StoragePricePerEpoch", "ProviderCollateral", "ClientCollateral", "ClientPeerID", "DealDataRoot",
"InboundFilePath", "TransferType", "TransferParams", "TransferSize", "ChainDealID", "PublishCID",
"SectorID", "Offset", "Length", "Checkpoint", "CheckpointAt", "Error", "Retry", "SignedProposalCID",
"FastRetrieval")
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`,
"FastRetrieval","AnnounceToIPNI")
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`,
deal.DealUuid, deal.CreatedAt, []byte("test"), deal.ClientDealProposal.Proposal.PieceCID.String(),
deal.ClientDealProposal.Proposal.PieceSize, deal.ClientDealProposal.Proposal.VerifiedDeal, deal.IsOffline,
deal.ClientDealProposal.Proposal.Client.String(), deal.ClientDealProposal.Proposal.Provider.String(), "test",
deal.ClientDealProposal.Proposal.StartEpoch, deal.ClientDealProposal.Proposal.EndEpoch, deal.ClientDealProposal.Proposal.StoragePricePerEpoch.Uint64(),
deal.ClientDealProposal.Proposal.ProviderCollateral.Int64(), deal.ClientDealProposal.Proposal.ClientCollateral.Uint64(), deal.ClientPeerID.String(),
deal.DealDataRoot.String(), deal.InboundFilePath, deal.Transfer.Type, deal.Transfer.Params, deal.Transfer.Size, deal.ChainDealID,
deal.PublishCID.String(), deal.SectorID, deal.Offset, deal.Length, deal.Checkpoint.String(), deal.CheckpointAt, deal.Err, deal.Retry, []byte("test"),
deal.FastRetrieval)
deal.FastRetrieval, deal.AnnounceToIPNI)

req.NoError(err)

Expand Down
60 changes: 60 additions & 0 deletions db/sectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package db

import (
"context"
"database/sql"
"github.com/filecoin-project/go-state-types/abi"
"time"
)

type SealState string

const SealStateSealed SealState = "Sealed"
const SealStateUnsealed SealState = "Unsealed"
const SealStateRemoved SealState = "Removed"

type SectorState struct {
SectorID abi.SectorID
UpdatedAt time.Time
SealState SealState
}

type SectorStateDB struct {
db *sql.DB
}

func NewSectorStateDB(db *sql.DB) *SectorStateDB {
return &SectorStateDB{db}
}

func (sdb *SectorStateDB) List(ctx context.Context) ([]SectorState, error) {
qry := "SELECT MinerID, SectorID, UpdatedAt, SealState FROM SectorState"
rows, err := sdb.db.QueryContext(ctx, qry)
if err != nil {
return nil, err
}
defer rows.Close()

states := make([]SectorState, 0, 16)
for rows.Next() {
var state SectorState
err := rows.Scan(&state.SectorID.Miner, &state.SectorID.Number, &state.UpdatedAt, &state.SealState)

if err != nil {
return nil, err
}
states = append(states, state)
}
if err := rows.Err(); err != nil {
return nil, err
}

return states, nil
}

func (sdb *SectorStateDB) Update(ctx context.Context, sectorID abi.SectorID, SealState SealState) error {
now := time.Now()
qry := "REPLACE INTO SectorState (MinerID, SectorID, UpdatedAt, SealState) VALUES (?, ?, ?, ?)"
_, err := sdb.db.ExecContext(ctx, qry, sectorID.Miner, sectorID.Number, now, SealState)
return err
}
Loading

0 comments on commit 6b246d2

Please sign in to comment.