Skip to content

Commit

Permalink
feat: download limit per host
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Sep 8, 2022
1 parent e0e4bda commit ba6895a
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 64 deletions.
2 changes: 1 addition & 1 deletion db/deals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestDealsDBSearch(t *testing.T) {

start := time.Now()
db := NewDealsDB(sqldb)
deals, err := generateDeals(5)
deals, err := GenerateNDeals(5)
req.NoError(err)
t.Logf("generated %d deals in %s", len(deals), time.Since(start))

Expand Down
6 changes: 3 additions & 3 deletions db/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
var clientAddrs = []uint64{01312, 42134, 01322, 43242, 04212}

func GenerateDeals() ([]types.ProviderDealState, error) {
return generateDeals(len(clientAddrs))
return GenerateNDeals(len(clientAddrs))
}

func generateDeals(count int) ([]types.ProviderDealState, error) {
func GenerateNDeals(count int) ([]types.ProviderDealState, error) {
provAddr, err := address.NewActorAddress([]byte("f1523"))
if err != nil {
return nil, err
Expand Down Expand Up @@ -81,7 +81,7 @@ func generateDeals(count int) ([]types.ProviderDealState, error) {
InboundFilePath: fmt.Sprintf("/data/staging/inbound/file-%d.car", rand.Intn(10000)),
Transfer: types.Transfer{
Type: "http",
Params: []byte(fmt.Sprintf("{url:'http://files.org/file%d.car'}", rand.Intn(1000))),
Params: []byte(fmt.Sprintf(`{"url":"http://files.org/file%d.car"}`, rand.Intn(1000))),
Size: uint64(rand.Intn(10000)),
},
ChainDealID: abi.DealID(rand.Intn(10000)),
Expand Down
10 changes: 10 additions & 0 deletions db/migrations/20220908122510_storagetagged_add_host.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE StorageTagged
ADD TransferHost TEXT;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
SELECT 'down SQL query';
-- +goose StatementEnd
58 changes: 58 additions & 0 deletions db/migrations/20220908122516_storagetagged_set_host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package migrations

import (
"database/sql"
"fmt"
"github.com/filecoin-project/boost/storagemarket/types"
"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigration(UpSetStorageTaggedTransferHost, DownSetStorageTaggedTransferHost)
}

func UpSetStorageTaggedTransferHost(tx *sql.Tx) error {
errPrefix := "setting StorageTagged.TransferHost: "
qry := "SELECT Deals.ID, Deals.TransferType, Deals.TransferParams " +
"FROM Deals INNER JOIN StorageTagged " +
"ON Deals.ID = StorageTagged.DealUUID"
rows, err := tx.Query(qry)
if err != nil {
return fmt.Errorf(errPrefix+"getting deals from DB: %w", err)
}
defer rows.Close()

for rows.Next() {
var id string
var xferType string
var params []byte
err := rows.Scan(&id, &xferType, &params)
if err != nil {
return fmt.Errorf(errPrefix+"scanning Deals row: %w", err)
}

dealErrPrefix := fmt.Sprintf(errPrefix+"deal %s: ", id)

xfer := types.Transfer{
Type: xferType,
Params: params,
}
host, err := xfer.Host()
if err != nil {
log.Warnw(dealErrPrefix+"ignoring - couldn't parse transfer params %s: '%s': %s", xferType, params, err)
continue
}

_, err = tx.Exec("UPDATE StorageTagged SET TransferHost = ? WHERE DealUUID = ?", host, id)
if err != nil {
return fmt.Errorf(dealErrPrefix+"saving TransferHost to DB: %w", err)
}
}
return rows.Err()
}

func DownSetStorageTaggedTransferHost(tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
// Do nothing because sqlite doesn't support removing a column.
return nil
}
70 changes: 70 additions & 0 deletions db/migrations_tests/storagetagged_set_host_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package migrations_tests

import (
"context"
"fmt"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/db/migrations"
"github.com/filecoin-project/boost/storagemarket/types"
"github.com/pressly/goose/v3"
"github.com/stretchr/testify/require"
"testing"
)

func TestStorageTaggedSetHost(t *testing.T) {
req := require.New(t)
ctx := context.Background()

sqldb := db.CreateTestTmpDB(t)
req.NoError(db.CreateAllBoostTables(ctx, sqldb, sqldb))

// Run migrations up to the one that adds the TransferHost field to StorageTagged
goose.SetBaseFS(migrations.EmbedMigrations)
req.NoError(goose.SetDialect("sqlite3"))
req.NoError(goose.UpTo(sqldb, ".", 20220908122510))

// Generate 2 deals
dealsDB := db.NewDealsDB(sqldb)
deals, err := db.GenerateNDeals(2)
req.NoError(err)

// Set the transfer params such that each deal has a different host
getHost := func(i int) string {
return fmt.Sprintf("files.org:%d", 1000+i)
}
for i, deal := range deals {
deal.Transfer = types.Transfer{
Type: "http",
Params: []byte(fmt.Sprintf(`{"url":"http://%s/file.car"}`, getHost(i))),
Size: uint64(1024),
}
err = dealsDB.Insert(ctx, &deal)
req.NoError(err)
}

// Simulate tagging a deal
taggedStorageDB := db.NewStorageDB(sqldb)
err = taggedStorageDB.Tag(ctx, deals[0].DealUuid, 1024, "")
req.NoError(err)

// Run the migration that reads the deal transfer params and sets
// StorageTagged.TransferHost
req.NoError(goose.UpByOne(sqldb, "."))

// Check that after migrating up, the host is set correctly
rows, err := sqldb.QueryContext(ctx, "SELECT TransferHost FROM StorageTagged")
req.NoError(err)
defer rows.Close() //nolint:errcheck

rowIdx := 0
for ; rows.Next(); rowIdx++ {
var host string
err := rows.Scan(&host)
req.NoError(err)
req.Equal(getHost(0), host)
}

// Even though there are two deals in DB, there is only one deal that is
// tagged, so there should only be one row
req.Equal(1, rowIdx)
}
24 changes: 19 additions & 5 deletions db/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func NewStorageDB(db *sql.DB) *StorageDB {
return &StorageDB{db: db}
}

func (s *StorageDB) Tag(ctx context.Context, dealUuid uuid.UUID, size uint64) error {
qry := "INSERT INTO StorageTagged (DealUUID, CreatedAt, TransferSize) "
qry += "VALUES (?, ?, ?)"
values := []interface{}{dealUuid, time.Now(), fmt.Sprintf("%d", size)}
func (s *StorageDB) Tag(ctx context.Context, dealUuid uuid.UUID, size uint64, host string) error {
qry := "INSERT INTO StorageTagged (DealUUID, CreatedAt, TransferSize, TransferHost) "
qry += "VALUES (?, ?, ?, ?)"
values := []interface{}{dealUuid, time.Now(), fmt.Sprintf("%d", size), host}
_, err := s.db.ExecContext(ctx, qry, values...)
return err
}
Expand Down Expand Up @@ -112,8 +112,22 @@ func (s *StorageDB) Logs(ctx context.Context) ([]StorageLog, error) {
return storageLogs, nil
}

func (s *StorageDB) TotalTaggedForHost(ctx context.Context, host string) (uint64, error) {
return s.totalTagged(ctx, host)
}

func (s *StorageDB) TotalTagged(ctx context.Context) (uint64, error) {
rows, err := s.db.QueryContext(ctx, "SELECT TransferSize FROM StorageTagged")
return s.totalTagged(ctx, "")
}

func (s *StorageDB) totalTagged(ctx context.Context, host string) (uint64, error) {
qry := "SELECT TransferSize FROM StorageTagged"
var args []interface{}
if host != "" {
qry += " WHERE TransferHost = ?"
args = append(args, host)
}
rows, err := s.db.QueryContext(ctx, qry, args...)
if err != nil {
return 0, fmt.Errorf("getting total tagged: %w", err)
}
Expand Down
15 changes: 12 additions & 3 deletions db/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"errors"
"testing"

"github.com/filecoin-project/boost/db/migrations"
"github.com/google/uuid"

"github.com/stretchr/testify/require"
)

Expand All @@ -16,6 +16,7 @@ func TestStorageDB(t *testing.T) {

sqldb := CreateTestTmpDB(t)
require.NoError(t, CreateAllBoostTables(ctx, sqldb, sqldb))
req.NoError(migrations.Migrate(sqldb))

db := NewStorageDB(sqldb)

Expand All @@ -28,12 +29,20 @@ func TestStorageDB(t *testing.T) {
req.True(errors.Is(err, ErrNotFound))
req.Equal(uint64(0), amt)

err = db.Tag(ctx, dealUUID, 1111)
err = db.Tag(ctx, dealUUID, 1111, "foo.bar:1234")
req.NoError(err)

dealUUID2 := uuid.New()
err = db.Tag(ctx, dealUUID2, 2222, "my.host:5678")
req.NoError(err)

total, err := db.TotalTagged(ctx)
req.NoError(err)
req.Equal(uint64(1111), total)
req.Equal(uint64(3333), total)

total, err = db.TotalTaggedForHost(ctx, "my.host:5678")
req.NoError(err)
req.Equal(uint64(2222), total)

amt, err = db.Untag(ctx, dealUUID)
req.NoError(err)
Expand Down
3 changes: 2 additions & 1 deletion node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,8 @@ func ConfigBoost(cfg *config.Boost) Option {
})),

Override(new(*storagemanager.StorageManager), storagemanager.New(storagemanager.Config{
MaxStagingDealsBytes: uint64(cfg.Dealmaking.MaxStagingDealsBytes),
MaxStagingDealsBytes: uint64(cfg.Dealmaking.MaxStagingDealsBytes),
MaxStagingDealsPercentPerHost: uint64(cfg.Dealmaking.MaxStagingDealsPercentPerHost),
})),

// Sector API
Expand Down
24 changes: 22 additions & 2 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 18 additions & 2 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,25 @@ type DealmakingConfig struct {
// The maximum collateral that the provider will put up against a deal,
// as a multiplier of the minimum collateral bound
MaxProviderCollateralMultiplier uint64
// The maximum allowed disk usage size in bytes of staging deals not yet
// passed to the sealing node by the markets service. 0 is unlimited.
// The maximum allowed disk usage size in bytes of downloaded deal data
// that has not yet been passed to the sealing node by boost.
// When the client makes a new deal proposal to download data from a host,
// boost checks this config value against the sum of:
// - the amount of data downloaded in the staging area
// - the amount of data that is queued for download
// - the amount of data in the proposed deal
// If the total amount would exceed the limit, boost rejects the deal.
// Set this value to 0 to indicate there is no limit.
MaxStagingDealsBytes int64
// The percentage of MaxStagingDealsBytes that is allocated to each host.
// When the client makes a new deal proposal to download data from a host,
// boost checks this config value against the sum of:
// - the amount of data downloaded from the host in the staging area
// - the amount of data that is queued for download from the host
// - the amount of data in the proposed deal
// If the total amount would exceed the limit, boost rejects the deal.
// Set this value to 0 to indicate there is no limit per host.
MaxStagingDealsPercentPerHost uint64
// Minimum start epoch buffer to give time for sealing of sector with deal.
StartEpochSealingBuffer uint64
// The amount of time to keep deal proposal logs for before cleaning them up.
Expand Down
Loading

0 comments on commit ba6895a

Please sign in to comment.