Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions dagstore_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@ package dagstore

import (
"context"
"fmt"
"io"

"github.com/filecoin-project/dagstore/index"
"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-cid"

"github.com/filecoin-project/go-data-segment/datasegment"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipld/go-car/v2"
carindex "github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"

"github.com/filecoin-project/dagstore/mount"
Expand Down Expand Up @@ -105,6 +112,13 @@ func (d *DAGStore) acquireAsync(ctx context.Context, w *waiter, s *Shard, mnt mo
// initializeShard initializes a shard asynchronously by fetching its data and
// performing indexing.
func (d *DAGStore) initializeShard(ctx context.Context, s *Shard, mnt mount.Mount) {
stat, err := mnt.Stat(ctx)
if err != nil {
log.Warnw("initialize: failed to stat from mount upgrader", "shard", s.key, "error", err)

_ = d.failShard(s, d.completionCh, "failed to get size of mount on initialization: %w", err)
return
}
reader, err := mnt.Fetch(ctx)
if err != nil {
log.Warnw("initialize: failed to fetch from mount upgrader", "shard", s.key, "error", err)
Expand All @@ -116,6 +130,28 @@ func (d *DAGStore) initializeShard(ctx context.Context, s *Shard, mnt mount.Moun

log.Debugw("initialize: successfully fetched from mount upgrader", "shard", s.key)

// attempt to get items by checking for a data segment index
dsIdx, err := d.parseShardWithDataSegmentIndex(ctx, s.key, stat.Size, reader)
if err == nil {
if err := d.indices.AddFullIndex(s.key, dsIdx); err != nil {
_ = d.failShard(s, d.completionCh, "failed to add index for shard: %w", err)
return
}
mhIter := &mhIdx{iterableIdx: dsIdx}
if err := d.TopLevelIndex.AddMultihashesForShard(ctx, mhIter, s.key); err != nil {
log.Errorw("failed to add shard multihashes to the inverted index", "shard", s.key, "error", err)
}
_ = d.queueTask(&task{op: OpShardMakeAvailable, shard: s}, d.completionCh)
return
}
log.Debugw("initialize: falling back to standard car parse.", "err", err, "shard", s.key)
if _, err := reader.Seek(0, 0); err != nil {
log.Warnw("initialize: failed to rewind mount", "shard", s.key, "error", err)

_ = d.failShard(s, d.completionCh, "failed to rewind reader of mount on initialization: %w", err)
return
}

// works for both CARv1 and CARv2.
var idx carindex.Index
err = d.throttleIndex.Do(ctx, func(_ context.Context) error {
Expand Down Expand Up @@ -151,6 +187,57 @@ func (d *DAGStore) initializeShard(ctx context.Context, s *Shard, mnt mount.Moun
_ = d.queueTask(&task{op: OpShardMakeAvailable, shard: s}, d.completionCh)
}

func (d *DAGStore) parseShardWithDataSegmentIndex(ctx context.Context, sKey shard.Key, size int64, r mount.Reader) (carindex.IterableIndex, error) {
ps := abi.UnpaddedPieceSize(size).Padded()
dsis := datasegment.DataSegmentIndexStartOffset(ps)
if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil {
return nil, fmt.Errorf("could not seek to data segment index: %w", err)
}
dataSegments, err := datasegment.ParseDataSegmentIndex(r)
if err != nil {
return nil, fmt.Errorf("could not parse data segment index: %w", err)
}
segments, err := dataSegments.ValidEntries()
if err != nil {
return nil, fmt.Errorf("could not calculate valid entries: %w", err)
}

if len(segments) == 0 {
return nil, fmt.Errorf("no data segments found")
}

finalIdx := carindex.NewInsertionIndex()
for _, s := range segments {
segOffset := s.UnpaddedOffest()
segSize := s.UnpaddedLength()
var idx carindex.Index
err = d.throttleIndex.Do(ctx, func(_ context.Context) error {
var err error

lr := io.NewSectionReader(r, int64(segOffset), int64(segSize))
idx, err = car.ReadOrGenerateIndex(lr, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true))
if err == nil {
log.Debugw("initialize: finished generating index for shard", "shard", sKey, "segment", s.Offset)
} else {
log.Warnw("initialize: failed to generate index for shard", "shard", sKey, "segment", s.Offset, "error", err)
}
return err
})
if err == nil {
if mhi, ok := idx.(*carindex.MultihashIndexSorted); ok {
_ = mhi.ForEach(func(mh multihash.Multihash, offset uint64) error {
finalIdx.InsertNoReplace(cid.NewCidV1(uint64(multicodec.Raw), mh), segOffset+offset)
return nil
})
} else {
log.Debugw("initialize: Unexpected index format on generation in shard", "shard", sKey, "offset", segOffset)
}
}
}

return finalIdx, nil
}

// Convenience struct for converting from CAR index.IterableIndex to the
// iterator required by the dag store inverted index.
type mhIdx struct {
Expand Down
30 changes: 16 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,32 @@ module github.com/filecoin-project/dagstore
go 1.16

require (
github.com/ipfs/go-block-format v0.0.3
github.com/filecoin-project/go-data-segment v0.0.0-20230306155603-4901260403b5
github.com/filecoin-project/go-state-types v0.10.0
github.com/ipfs/go-block-format v0.1.2
github.com/ipfs/go-blockservice v0.5.0
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-cidutil v0.1.0
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ipfs-blockstore v1.2.0
github.com/ipfs/go-ipfs-blockstore v1.3.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-exchange-offline v0.3.0
github.com/ipfs/go-ipfs-files v0.0.3
github.com/ipfs/go-ipld-format v0.3.0
github.com/ipfs/go-ipfs-files v0.3.0
github.com/ipfs/go-ipld-format v0.4.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-merkledag v0.8.1
github.com/ipfs/go-unixfs v0.3.1
github.com/ipld/go-car/v2 v2.4.1
github.com/ipfs/go-merkledag v0.10.0
github.com/ipfs/go-unixfs v0.4.4
github.com/ipld/go-car/v2 v2.9.1-0.20230327161157-6c6051d9fa75
github.com/jellydator/ttlcache/v2 v2.11.1
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multicodec v0.5.0
github.com/multiformats/go-multicodec v0.8.1
github.com/multiformats/go-multihash v0.2.1
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.8.2
github.com/syndtr/goleveldb v1.0.0
github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158
golang.org/x/exp v0.0.0-20210714144626-1041f73d31d8
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
golang.org/x/sync v0.1.0
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
)
Loading