Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ type RecoverOpts struct {
// will be notified when it completes.
//
// TODO add an operation identifier to ShardResult -- starts to look like
// a Trace event?
//
// a Trace event?
func (d *DAGStore) RecoverShard(ctx context.Context, key shard.Key, out chan ShardResult, _ RecoverOpts) error {
d.lk.Lock()
s, ok := d.shards[key]
Expand Down
35 changes: 34 additions & 1 deletion dagstore_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package dagstore

import (
"context"
"io"

"github.com/filecoin-project/dagstore/index"
"github.com/ipfs/go-cid"

"github.com/ipld/go-car/v2"
carindex "github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"

"github.com/filecoin-project/dagstore/mount"
Expand Down Expand Up @@ -118,20 +121,44 @@ func (d *DAGStore) initializeShard(ctx context.Context, s *Shard, mnt mount.Moun

// works for both CARv1 and CARv2.
var idx carindex.Index
var roots []cid.Cid

err = d.throttleIndex.Do(ctx, func(_ context.Context) error {
var err error
var headerReader *car.Reader

headerReader, err = car.NewReader(reader)
if err != nil {
log.Warnw("initialize: failed to read header for shard", "shard", s.key, "error", err)
return err
}

roots, err = headerReader.Roots()
if err != nil {
log.Warnw("initialize: failed to read roots for shard", "shard", s.key, "error", err)
return err
}

if _, err = reader.Seek(0, io.SeekStart); err != nil {
log.Warnw("initialize: failed to reset reader for shard", "shard", s.key, "error", err)
return err
}

idx, err = car.ReadOrGenerateIndex(reader, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true))
if err == nil {
log.Debugw("initialize: finished generating index for shard", "shard", s.key)
} else {
log.Warnw("initialize: failed to generate index for shard", "shard", s.key, "error", err)
}

return err
})

if err != nil {
_ = d.failShard(s, d.completionCh, "failed to read/generate CAR Index: %w", err)
return
}

if err := d.indices.AddFullIndex(s.key, idx); err != nil {
_ = d.failShard(s, d.completionCh, "failed to add index for shard: %w", err)
return
Expand All @@ -140,7 +167,7 @@ func (d *DAGStore) initializeShard(ctx context.Context, s *Shard, mnt mount.Moun
// add all cids in the shard to the inverted (cid -> []Shard Keys) index.
iterableIdx, ok := idx.(carindex.IterableIndex)
if ok {
mhIter := &mhIdx{iterableIdx: iterableIdx}
mhIter := &mhIdx{roots: roots, iterableIdx: iterableIdx}
if err := d.TopLevelIndex.AddMultihashesForShard(ctx, mhIter, s.key); err != nil {
log.Errorw("failed to add shard multihashes to the inverted index", "shard", s.key, "error", err)
}
Expand All @@ -154,12 +181,18 @@ func (d *DAGStore) initializeShard(ctx context.Context, s *Shard, mnt mount.Moun
// Convenience struct for converting from CAR index.IterableIndex to the
// iterator required by the dag store inverted index.
type mhIdx struct {
roots []cid.Cid
iterableIdx carindex.IterableIndex
}

var _ index.MultihashIterator = (*mhIdx)(nil)

func (it *mhIdx) ForEach(fn func(mh multihash.Multihash) error) error {
for _, root := range it.roots {
if root.Prefix().MhType == uint64(multicodec.Identity) {
fn(root.Hash())
}
}
return it.iterableIdx.ForEach(func(mh multihash.Multihash, _ uint64) error {
return fn(mh)
})
Expand Down