Skip to content

Commit 8d9286a

Browse files
changes as per review
1 parent 388f51f commit 8d9286a

File tree

3 files changed

+69
-66
lines changed

3 files changed

+69
-66
lines changed
Lines changed: 43 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,45 @@
1-
package dagstore
1+
package indexbs
22

33
import (
44
"context"
55
"errors"
66
"fmt"
77

8+
"github.com/filecoin-project/dagstore"
9+
blocks "github.com/ipfs/go-block-format"
810
logging "github.com/ipfs/go-log/v2"
911

1012
"github.com/filecoin-project/dagstore/shard"
1113
lru "github.com/hashicorp/golang-lru"
12-
13-
blocks "github.com/ipfs/go-block-format"
1414
"github.com/ipfs/go-cid"
1515
blockstore "github.com/ipfs/go-ipfs-blockstore"
1616
)
1717

1818
var logbs = logging.Logger("dagstore-all-readblockstore")
1919

20-
var _ blockstore.Blockstore = (*AllShardsReadBlockstore)(nil)
20+
var ErrBlockNotFound = errors.New("block not found")
21+
22+
var _ blockstore.Blockstore = (*IndexBackedBlockstore)(nil)
2123

2224
// ShardSelectorF helps select a shard to fetch a cid from if the given cid is present in multiple shards.
2325
type ShardSelectorF func(c cid.Cid, shards []shard.Key) (shard.Key, error)
2426

2527
type accessorWithBlockstore struct {
26-
sa *ShardAccessor
27-
bs ReadBlockstore
28+
sa *dagstore.ShardAccessor
29+
bs dagstore.ReadBlockstore
2830
}
2931

30-
// AllShardsReadBlockstore is a read only blockstore over all cids across all shards in the dagstore.
31-
type AllShardsReadBlockstore struct {
32-
d *DAGStore
32+
// IndexBackedBlockstore is a read only blockstore over all cids across all shards in the dagstore.
33+
type IndexBackedBlockstore struct {
34+
d *dagstore.DAGStore
3335
shardSelectF ShardSelectorF
3436

3537
// caches the blockstore for a given shard for shard read affinity i.e. further reads will likely be from the same shard.
3638
// shard key -> read only blockstore
3739
blockstoreCache *lru.Cache
38-
39-
// caches the blocks themselves -> can be scaled by using a redis/memcache etc distributed cache.
40-
// multihash -> block
41-
blockCache *lru.Cache
4240
}
4341

44-
func (d *DAGStore) AllShardsReadBlockstore(shardSelector ShardSelectorF, maxCacheSize int, maxBlocks int) (blockstore.Blockstore, error) {
42+
func NewIndexBackedBlockstore(d *dagstore.DAGStore, shardSelector ShardSelectorF, maxCacheSize int, maxBlocks int) (blockstore.Blockstore, error) {
4543
// instantiate the blockstore cache
4644
bslru, err := lru.NewWithEvict(maxCacheSize, func(_ interface{}, val interface{}) {
4745
// ensure we close the blockstore for a shard when it's evicted from the cache so dagstore can gc it.
@@ -52,42 +50,30 @@ func (d *DAGStore) AllShardsReadBlockstore(shardSelector ShardSelectorF, maxCach
5250
return nil, fmt.Errorf("failed to create lru cache for read only blockstores")
5351
}
5452

55-
// instantiate the block cache
56-
blkLru, err := lru.New(maxBlocks)
57-
if err != nil {
58-
return nil, fmt.Errorf("failed to create lru cache for blocks: %w", err)
59-
}
60-
61-
return &AllShardsReadBlockstore{
53+
return &IndexBackedBlockstore{
6254
d: d,
6355
shardSelectF: shardSelector,
6456
blockstoreCache: bslru,
65-
blockCache: blkLru,
6657
}, nil
6758
}
6859

69-
func (ro *AllShardsReadBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block, finalErr error) {
70-
logbs.Debugw("bitswap Get called", "cid", c)
60+
func (ro *IndexBackedBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block, finalErr error) {
61+
logbs.Debugw("Get called", "cid", c)
7162
defer func() {
7263
if finalErr != nil {
73-
logbs.Debugw("bitswap Get: got error", "cid", c, "error", finalErr)
64+
logbs.Debugw("Get: got error", "cid", c, "error", finalErr)
7465
}
7566
}()
7667

7768
mhash := c.Hash()
78-
// do we have the block cached ?
79-
if val, ok := ro.blockCache.Get(mhash.String()); ok {
80-
logbs.Debugw("bitswap Get: returning from block cache", "cid", c)
81-
return val.(blocks.Block), nil
82-
}
8369

8470
// fetch all the shards containing the multihash
8571
shards, err := ro.d.ShardsContainingMultihash(ctx, mhash)
8672
if err != nil {
8773
return nil, fmt.Errorf("failed to fetch shards containing the block: %w", err)
8874
}
8975
if len(shards) == 0 {
90-
return nil, errors.New("no shards contain the requested block")
76+
return nil, ErrBlockNotFound
9177
}
9278

9379
// do we have a cached blockstore for a shard containing the required block ? If yes, serve the block from that blockstore
@@ -101,13 +87,15 @@ func (ro *AllShardsReadBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks
10187
rbs := val.(*accessorWithBlockstore).bs
10288
blk, err := rbs.Get(ctx, c)
10389
if err != nil {
90+
// we know that the cid we want to lookup belongs to a shard with key `sk` and
91+
// so if we fail to get the corresponding block from the blockstore for that shards, something has gone wrong
92+
// and we should remove the blockstore for that shard from our cache.
10493
ro.blockstoreCache.Remove(sk)
10594
continue
10695
}
10796

10897
// add the block to the block cache
109-
logbs.Debugw("bitswap Get: returning from block store cache", "cid", c)
110-
ro.blockCache.Add(mhash.String(), blk)
98+
logbs.Debugw("Get: returning from block store cache", "cid", c)
11199
return blk, nil
112100
}
113101

@@ -116,15 +104,15 @@ func (ro *AllShardsReadBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks
116104
// select a valid shard that can serve the retrieval
117105
sk, err := ro.shardSelectF(c, shards)
118106
if err != nil {
119-
return nil, fmt.Errorf("failed to select a shard: %w", err)
107+
return nil, ErrBlockNotFound
120108
}
121109

122110
// load blockstore for the selected shard and try to serve the cid from that blockstore.
123-
resch := make(chan ShardResult, 1)
124-
if err := ro.d.AcquireShard(ctx, sk, resch, AcquireOpts{}); err != nil {
111+
resch := make(chan dagstore.ShardResult, 1)
112+
if err := ro.d.AcquireShard(ctx, sk, resch, dagstore.AcquireOpts{}); err != nil {
125113
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, err)
126114
}
127-
var res ShardResult
115+
var res dagstore.ShardResult
128116
select {
129117
case <-ctx.Done():
130118
return nil, ctx.Err()
@@ -147,61 +135,60 @@ func (ro *AllShardsReadBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks
147135

148136
// update the block cache and the blockstore cache
149137
ro.blockstoreCache.Add(sk, &accessorWithBlockstore{sa, bs})
150-
ro.blockCache.Add(mhash.String(), blk)
151138

152-
logbs.Debugw("bitswap Get: returning after creating new blockstore", "cid", c)
139+
logbs.Debugw("Get: returning after creating new blockstore", "cid", c)
153140
return blk, nil
154141
}
155142

156-
func (ro *AllShardsReadBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
157-
logbs.Debugw("bitswap Has called", "cid", c)
143+
func (ro *IndexBackedBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
144+
logbs.Debugw("Has called", "cid", c)
158145

159146
// if there is a shard that can serve the retrieval for the given cid, we have the requested cid
160147
// and has should return true.
161148
shards, err := ro.d.ShardsContainingMultihash(ctx, c.Hash())
162149
if err != nil {
163-
logbs.Debugw("bitswap Has error", "cid", c, "err", err)
164-
return false, fmt.Errorf("failed to fetch shards containing the multihash %w", err)
150+
logbs.Debugw("Has error", "cid", c, "err", err)
151+
return false, nil
165152
}
166153
if len(shards) == 0 {
167-
logbs.Debugw("bitswap Has: returning false no error", "cid", c)
154+
logbs.Debugw("Has: returning false no error", "cid", c)
168155
return false, nil
169156
}
170157

171158
_, err = ro.shardSelectF(c, shards)
172159
if err != nil {
173-
logbs.Debugw("bitswap Has error", "cid", c, "err", err)
174-
return false, fmt.Errorf("failed to select a shard: %w", err)
160+
logbs.Debugw("Has error", "cid", c, "err", err)
161+
return false, ErrBlockNotFound
175162
}
176163

177-
logbs.Debugw("bitswap Has: returning true", "cid", c)
164+
logbs.Debugw("Has: returning true", "cid", c)
178165
return true, nil
179166
}
180167

181-
func (ro *AllShardsReadBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
182-
logbs.Debugw("bitswap GetSize called", "cid", c)
168+
func (ro *IndexBackedBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
169+
logbs.Debugw("GetSize called", "cid", c)
183170

184171
blk, err := ro.Get(ctx, c)
185172
if err != nil {
186-
logbs.Debugw("bitswap GetSize error", "cid", c, "err", err)
173+
logbs.Debugw("GetSize error", "cid", c, "err", err)
187174
return 0, fmt.Errorf("failed to get block: %w", err)
188175
}
189176

190-
logbs.Debugw("bitswap GetSize success", "cid", c)
177+
logbs.Debugw("GetSize success", "cid", c)
191178
return len(blk.RawData()), nil
192179
}
193180

194181
// --- UNSUPPORTED BLOCKSTORE METHODS -------
195-
func (ro *AllShardsReadBlockstore) DeleteBlock(context.Context, cid.Cid) error {
182+
func (ro *IndexBackedBlockstore) DeleteBlock(context.Context, cid.Cid) error {
196183
return errors.New("unsupported operation DeleteBlock")
197184
}
198-
func (ro *AllShardsReadBlockstore) HashOnRead(_ bool) {}
199-
func (ro *AllShardsReadBlockstore) Put(context.Context, blocks.Block) error {
185+
func (ro *IndexBackedBlockstore) HashOnRead(_ bool) {}
186+
func (ro *IndexBackedBlockstore) Put(context.Context, blocks.Block) error {
200187
return errors.New("unsupported operation Put")
201188
}
202-
func (ro *AllShardsReadBlockstore) PutMany(context.Context, []blocks.Block) error {
189+
func (ro *IndexBackedBlockstore) PutMany(context.Context, []blocks.Block) error {
203190
return errors.New("unsupported operation PutMany")
204191
}
205-
func (ro *AllShardsReadBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
192+
func (ro *IndexBackedBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
206193
return nil, errors.New("unsupported operation AllKeysChan")
207194
}

readonly_bs_test.go renamed to indexbs/indexbacked_bs_test.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
package dagstore
1+
package indexbs
22

33
import (
44
"context"
55
"errors"
66
"testing"
77

8+
"github.com/filecoin-project/dagstore"
9+
"github.com/filecoin-project/dagstore/mount"
10+
"github.com/filecoin-project/dagstore/testdata"
11+
812
"github.com/multiformats/go-multihash"
913

1014
"github.com/filecoin-project/dagstore/shard"
@@ -18,10 +22,12 @@ var noOpSelector = func(c cid.Cid, shards []shard.Key) (shard.Key, error) {
1822
return shards[0], nil
1923
}
2024

25+
var carv2mnt = &mount.FSMount{FS: testdata.FS, Path: testdata.FSPathCarV2}
26+
2127
func TestReadOnlyBs(t *testing.T) {
2228
ctx := context.Background()
2329
store := dssync.MutexWrap(datastore.NewMapDatastore())
24-
dagst, err := NewDAGStore(Config{
30+
dagst, err := dagstore.NewDAGStore(dagstore.Config{
2531
MountRegistry: testRegistry(t),
2632
TransientsDir: t.TempDir(),
2733
Datastore: store,
@@ -31,15 +37,20 @@ func TestReadOnlyBs(t *testing.T) {
3137
err = dagst.Start(context.Background())
3238
require.NoError(t, err)
3339

34-
// two shards containing the same cid
35-
keys := registerShards(t, dagst, 2, carv2mnt, RegisterOpts{})
40+
// register a shard
41+
ch := make(chan dagstore.ShardResult, 1)
42+
sk := shard.KeyFromString("test1")
43+
err = dagst.RegisterShard(context.Background(), sk, carv2mnt, ch, dagstore.RegisterOpts{})
44+
require.NoError(t, err)
45+
res := <-ch
46+
require.NoError(t, res.Error)
3647

37-
rbs, err := dagst.AllShardsReadBlockstore(noOpSelector, 10, 10)
48+
rbs, err := NewIndexBackedBlockstore(dagst, noOpSelector, 10, 10)
3849
require.NoError(t, err)
3950

4051
// iterate over the CARV2 Index for the given CARv2 file and ensure the readonly blockstore
4152
// works for each of those cids
42-
it, err := dagst.GetIterableIndex(keys[0])
53+
it, err := dagst.GetIterableIndex(sk)
4354
require.NoError(t, err)
4455

4556
it.ForEach(func(mh multihash.Multihash, u uint64) error {
@@ -67,7 +78,7 @@ func TestReadOnlyBs(t *testing.T) {
6778
return shard.Key{}, errors.New("rejected")
6879
}
6980

70-
rbs, err = dagst.AllShardsReadBlockstore(fss, 10, 10)
81+
rbs, err = NewIndexBackedBlockstore(dagst, fss, 10, 10)
7182
require.NoError(t, err)
7283
it.ForEach(func(mh multihash.Multihash, u uint64) error {
7384
c := cid.NewCidV1(cid.Raw, mh)
@@ -86,5 +97,13 @@ func TestReadOnlyBs(t *testing.T) {
8697

8798
return nil
8899
})
100+
}
89101

102+
func testRegistry(t *testing.T) *mount.Registry {
103+
r := mount.NewRegistry()
104+
err := r.Register("fs", &mount.FSMount{FS: testdata.FS})
105+
require.NoError(t, err)
106+
err = r.Register("counting", new(mount.Counting))
107+
require.NoError(t, err)
108+
return r
90109
}

interface.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package dagstore
33
import (
44
"context"
55

6-
blockstore "github.com/ipfs/go-ipfs-blockstore"
7-
86
carindex "github.com/ipld/go-car/v2/index"
97
mh "github.com/multiformats/go-multihash"
108

@@ -26,5 +24,4 @@ type Interface interface {
2624
ShardsContainingMultihash(ctx context.Context, h mh.Multihash) ([]shard.Key, error)
2725
GC(ctx context.Context) (*GCResult, error)
2826
Close() error
29-
AllShardsReadBlockstore(shardSelector ShardSelectorF, maxBSCachesize int, maxBlkCachesize int) (blockstore.Blockstore, error)
3027
}

0 commit comments

Comments
 (0)