Skip to content

Commit 173a32a

Browse files
authored
Merge pull request #144 from filecoin-project/feat/ibs-acquire-sync
index-backed blockstore - synchronize acquires
2 parents 2a000e2 + 5c8caac commit 173a32a

File tree

2 files changed

+95
-151
lines changed

2 files changed

+95
-151
lines changed

indexbs/indexbacked_bs.go

Lines changed: 91 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,49 @@ var ErrNoShardSelected = errors.New("no shard selected")
2929
// It should return `ErrNoShardSelected` if none of the given shard is selected.
3030
type ShardSelectorF func(c cid.Cid, shards []shard.Key) (shard.Key, error)
3131

32+
type accessorWithBlockstore struct {
33+
sa *dagstore.ShardAccessor
34+
bs dagstore.ReadBlockstore
35+
}
36+
37+
type blockstoreAcquire struct {
38+
once sync.Once
39+
bs dagstore.ReadBlockstore
40+
err error
41+
}
42+
3243
// IndexBackedBlockstore is a read only blockstore over all cids across all shards in the dagstore.
3344
type IndexBackedBlockstore struct {
45+
ctx context.Context
3446
d dagstore.Interface
3547
shardSelectF ShardSelectorF
3648

3749
// caches the blockstore for a given shard for shard read affinity
3850
// i.e. further reads will likely be from the same shard. Maps (shard key -> blockstore).
39-
bsCache *blockstoreCache
51+
blockstoreCache *lru.Cache
52+
// used to manage concurrent acquisition of shards by multiple threads
53+
bsAcquireByShard sync.Map
4054
}
4155

42-
func NewIndexBackedBlockstore(d dagstore.Interface, shardSelector ShardSelectorF, maxCacheSize int) (blockstore.Blockstore, error) {
43-
cache, err := newBlockstoreCache(maxCacheSize)
56+
func NewIndexBackedBlockstore(ctx context.Context, d dagstore.Interface, shardSelector ShardSelectorF, maxCacheSize int) (blockstore.Blockstore, error) {
57+
// instantiate the blockstore cache
58+
bslru, err := lru.NewWithEvict(maxCacheSize, func(_ interface{}, val interface{}) {
59+
// Ensure we close the blockstore for a shard when it's evicted from
60+
// the cache so dagstore can gc it.
61+
// TODO: add reference counting mechanism so that the blockstore does
62+
// not get closed while there is an operation still in progress against it
63+
abs := val.(*accessorWithBlockstore)
64+
abs.sa.Close()
65+
})
4466
if err != nil {
45-
return nil, err
67+
return nil, fmt.Errorf("failed to create lru cache for read only blockstores")
4668
}
4769

4870
return &IndexBackedBlockstore{
49-
d: d,
50-
shardSelectF: shardSelector,
51-
bsCache: cache,
71+
ctx: ctx,
72+
d: d,
73+
shardSelectF: shardSelector,
74+
blockstoreCache: bslru,
5275
}, nil
5376
}
5477

@@ -117,25 +140,22 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block
117140
// If so, call op on the cached blockstore.
118141
for _, sk := range shards {
119142
// Get the shard's blockstore from the cache
120-
abs, ok := ro.bsCache.Get(sk)
143+
val, ok := ro.blockstoreCache.Get(sk)
121144
if ok {
122-
res, err := execOpOnBlockstore(ctx, c, sk, abs.bs, op)
123-
abs.close()
124-
if err == nil {
125-
// Found a cached shard blockstore containing the required block,
126-
// and successfully called the blockstore op
127-
return res, nil
145+
accessor := val.(*accessorWithBlockstore)
146+
res, err := execOpOnBlockstore(ctx, c, sk, accessor.bs, op)
147+
if err != nil {
148+
return nil, err
128149
}
150+
151+
// Found a cached blockstore containing the required block,
152+
// and successfully called the blockstore op
153+
return res, nil
129154
}
130155
}
131156

132-
// We weren't able to get the block which means that either
133-
// 1. There is no cached blockstore for a shard that contains the block
134-
// 2. There was an error trying to get the block from the existing cached
135-
// blockstore.
136-
// ShardsContainingMultihash indicated that the shard has the block, so
137-
// if there was an error getting it, it means there is something wrong.
138-
// So in either case we should create a new blockstore for the shard.
157+
// We weren't able to find a cached blockstore for a shard that contains
158+
// the block. Create a new blockstore for the shard.
139159

140160
// Use the shard select function to select one of the shards with the block
141161
sk, err := ro.shardSelectF(c, shards)
@@ -147,38 +167,61 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block
147167
return nil, fmt.Errorf("failed to run shard selection function: %w", err)
148168
}
149169

150-
// Load the blockstore for the selected shard.
151-
// Note that internally the DAG store will synchronize multiple concurrent
152-
// acquires for the same shard.
153-
resch := make(chan dagstore.ShardResult, 1)
154-
if err := ro.d.AcquireShard(ctx, sk, resch, dagstore.AcquireOpts{}); err != nil {
155-
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, err)
156-
}
157-
var shres dagstore.ShardResult
158-
select {
159-
case <-ctx.Done():
160-
return nil, ctx.Err()
161-
case shres = <-resch:
162-
if shres.Error != nil {
163-
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, shres.Error)
164-
}
165-
}
170+
// Some retrieval patterns will result in multiple threads fetching blocks
171+
// from the same piece concurrently. In that case many threads may attempt
172+
// to create a blockstore over the same piece. Use a sync.Once to ensure
173+
// that the blockstore is only created once for all threads waiting on the
174+
// same shard.
175+
bsAcquireI, _ := ro.bsAcquireByShard.LoadOrStore(sk, &blockstoreAcquire{})
176+
bsAcquire := bsAcquireI.(*blockstoreAcquire)
177+
bsAcquire.once.Do(func() {
178+
bsAcquire.bs, bsAcquire.err = func() (dagstore.ReadBlockstore, error) {
179+
// Check if the blockstore was created by another thread while this
180+
// thread was waiting to enter the sync.Once
181+
val, ok := ro.blockstoreCache.Get(sk)
182+
if ok {
183+
return val.(dagstore.ReadBlockstore), nil
184+
}
166185

167-
sa := shres.Accessor
168-
bs, err := sa.Blockstore()
169-
if err != nil {
170-
return nil, fmt.Errorf("failed to load read-only blockstore for shard %s: %w", sk, err)
171-
}
186+
// Acquire the blockstore for the selected shard
187+
resch := make(chan dagstore.ShardResult, 1)
188+
if err := ro.d.AcquireShard(ro.ctx, sk, resch, dagstore.AcquireOpts{}); err != nil {
189+
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, err)
190+
}
191+
var shres dagstore.ShardResult
192+
select {
193+
case <-ctx.Done():
194+
return nil, ctx.Err()
195+
case shres = <-resch:
196+
if shres.Error != nil {
197+
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, shres.Error)
198+
}
199+
}
200+
201+
sa := shres.Accessor
202+
bs, err := sa.Blockstore()
203+
if err != nil {
204+
return nil, fmt.Errorf("failed to load read-only blockstore for shard %s: %w", sk, err)
205+
}
206+
207+
// Add the blockstore to the cache
208+
ro.blockstoreCache.Add(sk, &accessorWithBlockstore{sa, bs})
209+
210+
logbs.Debugw("Added new blockstore to cache", "cid", c, "shard", sk)
211+
212+
return bs, nil
213+
}()
172214

173-
// Add the blockstore to the cache
174-
abs := &accessorWithBlockstore{sa: sa, bs: bs}
175-
ro.bsCache.Add(sk, abs)
176-
defer abs.close()
215+
// The sync.Once has completed so clean up the acquire entry for this shard
216+
ro.bsAcquireByShard.Delete(sk)
217+
})
177218

178-
logbs.Debugw("Added new blockstore to cache", "cid", c, "shard", sk)
219+
if bsAcquire.err != nil {
220+
return nil, bsAcquire.err
221+
}
179222

180223
// Call the operation on the blockstore
181-
return execOpOnBlockstore(ctx, c, sk, bs, op)
224+
return execOpOnBlockstore(ctx, c, sk, bsAcquire.bs, op)
182225
}
183226

184227
func execOpOnBlockstore(ctx context.Context, c cid.Cid, sk shard.Key, bs dagstore.ReadBlockstore, op BlockstoreOp) (*opRes, error) {
@@ -241,102 +284,3 @@ func (ro *IndexBackedBlockstore) PutMany(context.Context, []blocks.Block) error
241284
func (ro *IndexBackedBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
242285
return nil, errors.New("unsupported operation AllKeysChan")
243286
}
244-
245-
type blockstoreCache struct {
246-
lk sync.Mutex
247-
cache *lru.Cache
248-
}
249-
250-
func newBlockstoreCache(size int) (*blockstoreCache, error) {
251-
bslru, err := lru.NewWithEvict(size, func(_ interface{}, val interface{}) {
252-
abs := val.(*accessorWithBlockstore)
253-
abs.evict()
254-
})
255-
if err != nil {
256-
return nil, fmt.Errorf("failed to create lru cache for read only blockstores: %w", err)
257-
}
258-
259-
return &blockstoreCache{cache: bslru}, nil
260-
}
261-
262-
func (bc *blockstoreCache) Get(sk shard.Key) (*accessorWithBlockstore, bool) {
263-
bc.lk.Lock()
264-
defer bc.lk.Unlock()
265-
266-
// Get the accessor from the cache
267-
absi, ok := bc.cache.Get(sk)
268-
if !ok {
269-
return nil, false
270-
}
271-
272-
// Increment the accessor's ref count so that the blockstore
273-
// will not be closed until the caller is finished with it
274-
abs := absi.(*accessorWithBlockstore)
275-
abs.incRefCount()
276-
return abs, true
277-
}
278-
279-
func (bc *blockstoreCache) Add(sk shard.Key, abs *accessorWithBlockstore) {
280-
bc.lk.Lock()
281-
defer bc.lk.Unlock()
282-
283-
// Check if we're replacing an existing accessor with this Add
284-
absi, ok := bc.cache.Get(sk)
285-
if ok {
286-
// Mark the existing accessor as evicted so that its blockstore can be
287-
// closed once all callers are done with the blockstore
288-
abs := absi.(*accessorWithBlockstore)
289-
abs.evict()
290-
}
291-
292-
// Add the new accessor
293-
bc.cache.Add(sk, abs)
294-
abs.incRefCount()
295-
}
296-
297-
type accessorWithBlockstore struct {
298-
sa *dagstore.ShardAccessor
299-
bs dagstore.ReadBlockstore
300-
301-
lk sync.Mutex
302-
evicted bool
303-
refCount int
304-
}
305-
306-
func (abs *accessorWithBlockstore) incRefCount() {
307-
abs.lk.Lock()
308-
defer abs.lk.Unlock()
309-
310-
abs.refCount++
311-
}
312-
313-
func (abs *accessorWithBlockstore) close() {
314-
abs.lk.Lock()
315-
defer abs.lk.Unlock()
316-
317-
abs.refCount--
318-
if abs.refCount == 0 && abs.evicted {
319-
// The blockstore has already been evicted, and this was the last
320-
// reference to it, so close the blockstore so that dagstore can GC it
321-
err := abs.sa.Close()
322-
if err != nil {
323-
logbs.Warnf("error closing blockstore: %w", err)
324-
}
325-
}
326-
}
327-
328-
func (abs *accessorWithBlockstore) evict() {
329-
abs.lk.Lock()
330-
defer abs.lk.Unlock()
331-
332-
abs.evicted = true
333-
334-
if abs.refCount == 0 {
335-
// There are no more references to the blockstore; close it so that the
336-
// dagstore can GC it
337-
err := abs.sa.Close()
338-
if err != nil {
339-
logbs.Warnf("error closing blockstore: %w", err)
340-
}
341-
}
342-
}

indexbs/indexbacked_bs_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestReadOnlyBs(t *testing.T) {
4747
res := <-ch
4848
require.NoError(t, res.Error)
4949

50-
rbs, err := NewIndexBackedBlockstore(dagst, noOpSelector, 10)
50+
rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10)
5151
require.NoError(t, err)
5252

5353
// iterate over the CARV2 Index for the given CARv2 file and ensure the readonly blockstore
@@ -108,7 +108,7 @@ func TestReadOnlyBs(t *testing.T) {
108108
return shard.Key{}, rejectedErr
109109
}
110110

111-
rbs, err = NewIndexBackedBlockstore(dagst, fss, 10)
111+
rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10)
112112
require.NoError(t, err)
113113
it.ForEach(func(mh multihash.Multihash, u uint64) error {
114114
c := cid.NewCidV1(cid.Raw, mh)
@@ -134,7 +134,7 @@ func TestReadOnlyBs(t *testing.T) {
134134
return shard.Key{}, ErrNoShardSelected
135135
}
136136

137-
rbs, err = NewIndexBackedBlockstore(dagst, fss, 10)
137+
rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10)
138138
require.NoError(t, err)
139139
it.ForEach(func(mh multihash.Multihash, u uint64) error {
140140
c := cid.NewCidV1(cid.Raw, mh)
@@ -162,7 +162,7 @@ func TestReadOnlyBs(t *testing.T) {
162162
notFoundCid, err := cid.Parse("bafzbeigai3eoy2ccc7ybwjfz5r3rdxqrinwi4rwytly24tdbh6yk7zslrm")
163163
require.NoError(t, err)
164164

165-
rbs, err = NewIndexBackedBlockstore(dagst, noOpSelector, 10)
165+
rbs, err = NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10)
166166
require.NoError(t, err)
167167

168168
// Has should return false

0 commit comments

Comments
 (0)