Skip to content

Commit 58163ca

Browse files
authored
feat: shard selector (#807)
1 parent e0e1d73 commit 58163ca

File tree

8 files changed

+683
-63
lines changed

8 files changed

+683
-63
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ require (
4545
github.com/graph-gophers/graphql-go v1.2.0
4646
github.com/graph-gophers/graphql-transport-ws v0.0.2
4747
github.com/hashicorp/go-multierror v1.1.1
48+
github.com/hnlq715/golang-lru v0.3.0
4849
github.com/ipfs/go-bitswap v0.9.0
4950
github.com/ipfs/go-block-format v0.0.3
5051
github.com/ipfs/go-blockservice v0.4.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,8 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p
750750
github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
751751
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
752752
github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk=
753+
github.com/hnlq715/golang-lru v0.3.0 h1:eJtRD3bIw/dxwha16+urdY7bGfoCy/fAM+A/gahvYJM=
754+
github.com/hnlq715/golang-lru v0.3.0/go.mod h1:RBkgDAtlu0SgTPvpb4VW2/RQnkCBMRD3Lr6B9RhsAS8=
753755
github.com/hodgesds/perf-utils v0.0.8/go.mod h1:F6TfvsbtrF88i++hou29dTXlI2sfsJv+gRZDtmTJkAs=
754756
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
755757
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=

node/modules/storageminer.go

Lines changed: 13 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"database/sql"
66
"errors"
77
"fmt"
8+
brm "github.com/filecoin-project/boost/retrievalmarket/lib"
89
"path"
910
"time"
1011

@@ -25,12 +26,10 @@ import (
2526
"github.com/filecoin-project/boost/transport/httptransport"
2627
"github.com/filecoin-project/dagstore"
2728
"github.com/filecoin-project/dagstore/indexbs"
28-
"github.com/filecoin-project/dagstore/shard"
2929
"github.com/filecoin-project/go-address"
3030
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
3131
"github.com/filecoin-project/go-fil-markets/shared"
3232
lotus_storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket"
33-
"github.com/filecoin-project/go-state-types/abi"
3433
"github.com/filecoin-project/go-state-types/crypto"
3534
"github.com/filecoin-project/lotus/api/v1api"
3635
ctypes "github.com/filecoin-project/lotus/chain/types"
@@ -445,68 +444,19 @@ func NewGraphqlServer(cfg *config.Boost) func(lc fx.Lifecycle, r repo.LockedRepo
445444
}
446445
}
447446

448-
func NewIndexBackedBlockstore(dagst dagstore.Interface, ps lotus_dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, rp retrievalmarket.RetrievalProvider, h host.Host) (dtypes.IndexBackedBlockstore, error) {
449-
sf := indexbs.ShardSelectorF(func(c cid.Cid, shards []shard.Key) (shard.Key, error) {
450-
for _, sk := range shards {
451-
// parse piece CID
452-
pieceCid, err := cid.Parse(sk.String())
453-
if err != nil {
454-
return shard.Key{}, fmt.Errorf("failed to parse cid")
455-
}
456-
// read piece info from piece store
457-
pieceInfo, err := ps.GetPieceInfo(pieceCid)
458-
if err != nil {
459-
return shard.Key{}, fmt.Errorf("failed to get piece info: %w", err)
460-
}
461-
462-
// check if piece is in unsealed sector
463-
isUnsealed := false
464-
for _, di := range pieceInfo.Deals {
465-
isUnsealed, err = sa.IsUnsealed(context.TODO(), di.SectorID, di.Offset.Unpadded(), di.Length.Unpadded())
466-
if err != nil {
467-
log.Errorf("failed to find out if sector %d is unsealed, err=%s", di.SectorID, err)
468-
continue
469-
}
470-
if isUnsealed {
471-
break
472-
}
473-
}
474-
475-
// sealed sector, skip
476-
if !isUnsealed {
477-
continue
478-
}
479-
480-
// The piece is in an unsealed sector
481-
// Is it marked for free retrieval ?
482-
// we don't pass the payload cid so we can potentially cache on a piece cid basis
483-
input := retrievalmarket.PricingInput{
484-
// piece from which the payload will be retrieved
485-
PieceCID: pieceInfo.PieceCID,
486-
Unsealed: true,
487-
}
488-
489-
var dealsIds []abi.DealID
490-
for _, d := range pieceInfo.Deals {
491-
dealsIds = append(dealsIds, d.DealID)
492-
}
493-
494-
ask, err := rp.GetDynamicAsk(context.TODO(), input, dealsIds)
495-
if err != nil {
496-
return shard.Key{}, fmt.Errorf("failed to get retrieval ask: %w", err)
497-
}
498-
499-
// if piece is free we found a match
500-
// otherwise, go to the next shard
501-
if ask.PricePerByte.NilOrZero() {
502-
return sk, nil
503-
}
504-
}
505-
506-
return shard.Key{}, indexbs.ErrNoShardSelected
447+
func NewIndexBackedBlockstore(lc fx.Lifecycle, dagst dagstore.Interface, ps lotus_dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, rp retrievalmarket.RetrievalProvider) (dtypes.IndexBackedBlockstore, error) {
448+
ctx, cancel := context.WithCancel(context.Background())
449+
lc.Append(fx.Hook{
450+
OnStop: func(ctx context.Context) error {
451+
cancel()
452+
return nil
453+
},
507454
})
508-
509-
rbs, err := indexbs.NewIndexBackedBlockstore(dagst, sf, 100)
455+
ss, err := brm.NewShardSelector(ctx, ps, sa, rp)
456+
if err != nil {
457+
return nil, fmt.Errorf("creating shard selector: %w", err)
458+
}
459+
rbs, err := indexbs.NewIndexBackedBlockstore(dagst, ss.ShardSelectorF, 100)
510460
if err != nil {
511461
return nil, fmt.Errorf("failed to create index backed blockstore: %w", err)
512462
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package lib
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/filecoin-project/dagstore/indexbs"
10+
"github.com/filecoin-project/dagstore/shard"
11+
"github.com/filecoin-project/go-fil-markets/piecestore"
12+
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
13+
"github.com/filecoin-project/go-state-types/abi"
14+
lru "github.com/hnlq715/golang-lru"
15+
"github.com/ipfs/go-cid"
16+
logging "github.com/ipfs/go-log/v2"
17+
)
18+
19+
var sslog = logging.Logger("shardselect")
20+
21+
// ShardSelector is used by the dagstore's index-backed blockstore to select
22+
// the best shard from which to retrieve a particular cid.
23+
// It chooses the first shard that is unsealed and free (zero cost).
24+
// It caches the results per-shard.
25+
type ShardSelector struct {
26+
ctx context.Context
27+
ps piecestore.PieceStore
28+
sa retrievalmarket.SectorAccessor
29+
rp retrievalmarket.RetrievalProvider
30+
31+
// The striped lock protects against multiple threads doing a lookup
32+
// against the sealing subsystem / retrieval ask for the same shard
33+
stripedLock [256]sync.Mutex
34+
cache *lru.Cache
35+
}
36+
37+
func NewShardSelector(ctx context.Context, ps piecestore.PieceStore, sa retrievalmarket.SectorAccessor, rp retrievalmarket.RetrievalProvider) (*ShardSelector, error) {
38+
cache, err := lru.New(2048)
39+
if err != nil {
40+
return nil, fmt.Errorf("creating shard selector cache: %w", err)
41+
}
42+
return &ShardSelector{ctx: ctx, ps: ps, sa: sa, rp: rp, cache: cache}, nil
43+
}
44+
45+
var selectorCacheDuration = 10 * time.Minute
46+
var selectorCacheErrorDuration = time.Minute
47+
48+
type shardSelectResult struct {
49+
available bool
50+
err error
51+
}
52+
53+
// ShardSelectorF chooses the first shard that is unsealed and free (zero cost)
54+
func (s *ShardSelector) ShardSelectorF(c cid.Cid, shards []shard.Key) (shard.Key, error) {
55+
// If no shards are selected, return ErrNoShardSelected
56+
lastErr := indexbs.ErrNoShardSelected
57+
58+
sslog.Debugw("shard selection", "shards", shards)
59+
for _, sk := range shards {
60+
lkidx := s.stripedLockIndex(sk)
61+
s.stripedLock[lkidx].Lock()
62+
available, err := s.isAvailable(sk)
63+
s.stripedLock[lkidx].Unlock()
64+
65+
if available {
66+
// We found an available shard, return it
67+
sslog.Debugw("shard selected", "shard", sk)
68+
return sk, nil
69+
}
70+
if err != nil {
71+
sslog.Debugw("shard error", "shard", sk, "err", err)
72+
lastErr = err
73+
}
74+
}
75+
76+
// None of the shards are available
77+
sslog.Debugw("no shard selected", "shards", shards, "err", lastErr)
78+
return shard.Key{}, lastErr
79+
}
80+
81+
func (s *ShardSelector) isAvailable(sk shard.Key) (bool, error) {
82+
// Check if the shard key is in the cache
83+
var res *shardSelectResult
84+
resi, cached := s.cache.Get(sk)
85+
if cached {
86+
res = resi.(*shardSelectResult)
87+
sslog.Debugw("shard cache hit", "shard", sk)
88+
return res.available, res.err
89+
}
90+
sslog.Debugw("shard cache miss", "shard", sk)
91+
92+
// Check if the shard is available
93+
res = &shardSelectResult{}
94+
res.available, res.err = s.checkIsAvailable(sk)
95+
expireIn := selectorCacheDuration
96+
if res.err != nil {
97+
// If there's an error, cache for a short duration so that we
98+
// don't wait too long to try again.
99+
expireIn = selectorCacheErrorDuration
100+
res.available = false
101+
res.err = fmt.Errorf("running shard selection for shard %s: %w", sk, res.err)
102+
sslog.Warnw("checking shard availability", "shard", sk, "err", res.err)
103+
}
104+
// Add the result to the cache
105+
s.cache.AddEx(sk, res, expireIn)
106+
107+
return res.available, res.err
108+
}
109+
110+
func (s *ShardSelector) checkIsAvailable(sk shard.Key) (bool, error) {
111+
// Parse piece CID
112+
pieceCid, err := cid.Parse(sk.String())
113+
if err != nil {
114+
return false, fmt.Errorf("parsing shard key as cid: %w", err)
115+
}
116+
117+
// Read piece info from piece store
118+
sslog.Debugw("getting piece info", "shard", sk)
119+
pieceInfo, err := s.ps.GetPieceInfo(pieceCid)
120+
if err != nil {
121+
return false, fmt.Errorf("get piece info: %w", err)
122+
}
123+
124+
// Filter for deals that are unsealed
125+
sslog.Debugw("filtering for unsealed deals", "shard", sk, "deals", len(pieceInfo.Deals))
126+
unsealedDeals := make([]piecestore.DealInfo, 0, len(pieceInfo.Deals))
127+
var lastErr error
128+
for _, di := range pieceInfo.Deals {
129+
isUnsealed, err := s.sa.IsUnsealed(s.ctx, di.SectorID, di.Offset.Unpadded(), di.Length.Unpadded())
130+
if err != nil {
131+
sslog.Warnf("checking if sector is unsealed", "shard", "sector", di.SectorID, sk, "err", err)
132+
lastErr = err
133+
continue
134+
}
135+
136+
if isUnsealed {
137+
sslog.Debugw("sector is unsealed", "shard", "sector", di.SectorID)
138+
unsealedDeals = append(unsealedDeals, di)
139+
} else {
140+
sslog.Debugw("sector is sealed", "shard", "sector", di.SectorID)
141+
}
142+
}
143+
144+
if len(unsealedDeals) == 0 {
145+
// It wasn't possible to find an unsealed sector
146+
sslog.Debugw("no unsealed deals found", "shard", sk)
147+
return false, lastErr
148+
}
149+
150+
// Check if the piece is available for free (zero-cost) retrieval
151+
input := retrievalmarket.PricingInput{
152+
// Piece from which the payload will be retrieved
153+
PieceCID: pieceInfo.PieceCID,
154+
Unsealed: true,
155+
}
156+
157+
var dealsIds []abi.DealID
158+
for _, d := range unsealedDeals {
159+
dealsIds = append(dealsIds, d.DealID)
160+
}
161+
162+
sslog.Debugw("getting dynamic asking price for unsealed deals", "shard", sk, "deals", len(unsealedDeals))
163+
ask, err := s.rp.GetDynamicAsk(s.ctx, input, dealsIds)
164+
if err != nil {
165+
return false, fmt.Errorf("getting retrieval ask: %w", err)
166+
}
167+
168+
// The piece is available for free retrieval
169+
if ask.PricePerByte.NilOrZero() {
170+
sslog.Debugw("asking price for unsealed deals is zero", "shard", sk)
171+
return true, nil
172+
}
173+
174+
sslog.Debugw("asking price-per-byte for unsealed deals is non-zero", "shard", sk, "price", ask.PricePerByte.String())
175+
return false, nil
176+
}
177+
178+
func (s *ShardSelector) stripedLockIndex(sk shard.Key) int {
179+
skstr := sk.String()
180+
return int(skstr[len(skstr)-1])
181+
}

0 commit comments

Comments
 (0)