Skip to content
This repository was archived by the owner on Jun 27, 2023. It is now read-only.

Concurrent walk test #106

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,8 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
return
}
// FIXME: Make concurrency an option for testing.
//err := dag.Walk(ctx, getLinks, ds.cid, cset.Visit, dag.Concurrent())
err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit)
err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit, dag.Concurrent())
//err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit)
if err != nil {
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
}
Expand Down
11 changes: 4 additions & 7 deletions hamt/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func IdHash(val []byte) []byte {
// * all leaf Shard nodes have the same depth (and have only 'value' links).
// * all internal Shard nodes point only to other Shards (and hence have zero 'value' links).
// * the total number of 'value' links (directory entries) is:
// io.DefaultShardWidth ^ treeHeight.
// io.DefaultShardWidth ^ (treeHeight + 1).
// FIXME: HAMTHashFunction needs to be set to IdHash by the caller. We depend on
// this simplification for the current logic to work. (HAMTHashFunction is a
// global setting of the package, it is hard-coded in the serialized Shard node
Expand All @@ -100,7 +100,7 @@ func IdHash(val []byte) []byte {
// the fake hash as in io.SetAndPrevious through `newHashBits()` and pass
// it as an argument making the hash independent of tree manipulation; that
// sounds as the correct way to go in general and we wouldn't need this.)
func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int) (ipld.Node, error) {
func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int, childsPerNode int) (ipld.Node, error) {
if treeHeight < 1 {
panic("treeHeight < 1")
}
Expand All @@ -112,11 +112,6 @@ func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int) (ipld.Node, error) {
//}
// FIXME: Any clean and simple way to do this? Otherwise remove check.

//childsPerNode := io.DefaultShardWidth
childsPerNode := 256 // (FIXME: hard-coded as we have an 'import cycle not
// allowed' error from io package otherwise.)
// FIXME: Evaluate making this an argument.

rootShard, err := NewShard(ds, childsPerNode)
if err != nil {
return nil, err
Expand All @@ -131,6 +126,8 @@ func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int) (ipld.Node, error) {
var hashbuf [8]byte
binary.LittleEndian.PutUint64(hashbuf[:], uint64(i))
var oldLink *ipld.Link
// FIXME: This is wrong for childsPerNode/DefaultShardWidth different
// than 256 (i.e., one byte of key per level).
oldLink, err = rootShard.SetAndPrevious(context.Background(), string(hashbuf[:treeHeight]), unixfs.EmptyFileNode())
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion hamt/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestCreateCompleteShard(t *testing.T) {
treeHeight := 2 // This is the limit of what we can fastly generate,
// the default width is too big (256). We may need to refine
// CreateCompleteHAMT encoding of the key to reduce the tableSize.
node, err := CreateCompleteHAMT(ds, treeHeight)
node, err := CreateCompleteHAMT(ds, treeHeight, 256)
assert.NoError(t, err)

shard, err := NewHamtFromDag(ds, node)
Expand Down
80 changes: 53 additions & 27 deletions io/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"sync"
"testing"
"time"

cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
Expand Down Expand Up @@ -328,13 +329,16 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) {
oldHashFunc := hamt.HAMTHashFunction
defer func() { hamt.HAMTHashFunction = oldHashFunc }()
hamt.HAMTHashFunction = hamt.IdHash
//oldShardWidth := DefaultShardWidth
//defer func() { DefaultShardWidth = oldShardWidth }()
//DefaultShardWidth = 8
// FIXME: We should be able to use a smaller DefaultShardWidth to have
// a deeper tree and cheaper tests once the import cycle is resolved
// in hamt.CreateCompleteHAMT and the DefaultShardWidth value is not
// hardcoded there.
oldShardWidth := DefaultShardWidth
defer func() { DefaultShardWidth = oldShardWidth }()
DefaultShardWidth = 16 // FIXME: Review number. From 256 to 16 or 8 (if
// (if we fix CreateCompleteHAMT).

// FIXME: Taken from private github.com/ipfs/go-merkledag@v0.2.3/merkledag.go.
// (We can also pass an explicit concurrency value in `(*Shard).EnumLinksAsync()`
// and take ownership of this configuration, but departing from the more
// standard and reliable one in `go-merkledag`.
defaultConcurrentFetch := 32

// We create a "complete" HAMT (see CreateCompleteHAMT for more details)
// with a regular structure to be able to predict how many Shard nodes we
Expand All @@ -343,34 +347,52 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) {
oldHamtOption := HAMTShardingSize
defer func() { HAMTShardingSize = oldHamtOption }()
// (Some arbitrary values below that make this test not that expensive.)
treeHeight := 2
thresholdToWidthRatio := 4 // How many leaf shards nodes (with value links,
treeHeight := 3 // FIXME: Review number. From 2 to 3.
// How many leaf shards nodes (with value links,
// i.e., directory entries) do we need to reach the threshold.
thresholdToWidthRatio := 4
// FIXME: Review dag.Walk algorithm to better figure out this estimate.

HAMTShardingSize = DefaultShardWidth * thresholdToWidthRatio
// With this structure we will then need to fetch the following nodes:
// With this structure and a BFS traversal (from `parallelWalkDepth`) then
// we would roughly fetch the following nodes:
nodesToFetch := 0
// * all layers up to (but not including) the last one with leaf nodes
// (because it's a BFS)
for i := 0; i < treeHeight; i++ {
nodesToFetch += int(math.Pow(float64(DefaultShardWidth), float64(i)))
}
// * `thresholdToWidthRatio` leaf Shards with enough value links to reach
// the HAMTShardingSize threshold.
// * `(treeHeight - 1)` internal nodes to reach those leaf Shard nodes
// (assuming we have thresholdToWidthRatio below the DefaultShardWidth,
// i.e., all leaf nodes come from the same parent).
nodesToFetch := thresholdToWidthRatio + treeHeight - 1
nodesToFetch += thresholdToWidthRatio
// * `defaultConcurrentFetch` potential extra nodes of the threads working
// in parallel
nodesToFetch += defaultConcurrentFetch

ds := mdtest.Mock()
completeHAMTRoot, err := hamt.CreateCompleteHAMT(ds, treeHeight)
completeHAMTRoot, err := hamt.CreateCompleteHAMT(ds, treeHeight, DefaultShardWidth)
assert.NoError(t, err)

countGetsDS := newCountGetsDS(ds)
hamtDir, err := newHAMTDirectoryFromNode(countGetsDS, completeHAMTRoot)
assert.NoError(t, err)

countGetsDS.resetCounter()
countGetsDS.setRequestDelay(10 * time.Millisecond)
// FIXME: Only works with sequential DAG walk (now hardcoded, needs to be
// added to the internal API) where we can predict the Get requests and
// tree traversal. It would be desirable to have some test for the concurrent
// walk (which is the one used in production).
below, err := hamtDir.sizeBelowThreshold(context.TODO(), 0)
assert.NoError(t, err)
assert.False(t, below)
assert.Equal(t, nodesToFetch, countGetsDS.uniqueCidsFetched())
t.Logf("fetched %d/%d nodes", countGetsDS.uniqueCidsFetched(), nodesToFetch)
assert.True(t, countGetsDS.uniqueCidsFetched() <= nodesToFetch)
assert.True(t, countGetsDS.uniqueCidsFetched() >= nodesToFetch-defaultConcurrentFetch)
// (Without the `setRequestDelay` above the number of nodes fetched
// drops dramatically and unpredictably as the BFS starts to behave
// more like a DFS because some search paths are fetched faster than
// others.)
}

// Compare entries in the leftDir against the rightDir and possibly
Expand Down Expand Up @@ -519,6 +541,8 @@ type countGetsDS struct {

cidsFetched map[cid.Cid]struct{}
mapLock sync.Mutex

getRequestDelay time.Duration
}

var _ ipld.DAGService = (*countGetsDS)(nil)
Expand All @@ -528,6 +552,7 @@ func newCountGetsDS(ds ipld.DAGService) *countGetsDS {
ds,
make(map[cid.Cid]struct{}),
sync.Mutex{},
0,
}
}

Expand All @@ -543,30 +568,31 @@ func (d *countGetsDS) uniqueCidsFetched() int {
return len(d.cidsFetched)
}

func (d *countGetsDS) setRequestDelay(timeout time.Duration) {
d.getRequestDelay = timeout
}

func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) {
node, err := d.DAGService.Get(ctx, c)
if err != nil {
return nil, err
}

d.mapLock.Lock()
_, cidRequestedBefore := d.cidsFetched[c]
d.cidsFetched[c] = struct{}{}
d.mapLock.Unlock()

if d.getRequestDelay != 0 && !cidRequestedBefore {
// First request gets a timeout to simulate a network fetch.
// Subsequent requests get no timeout simulating an in-disk cache.
time.Sleep(d.getRequestDelay)
}

return node, nil
}

// Process sequentially (blocking) calling Get which tracks requests.
func (d *countGetsDS) GetMany(ctx context.Context, cids []cid.Cid) <-chan *ipld.NodeOption {
out := make(chan *ipld.NodeOption, len(cids))
defer close(out)
for _, c := range cids {
node, err := d.Get(ctx, c)
if err != nil {
out <- &ipld.NodeOption{Err: err}
break
}
out <- &ipld.NodeOption{Node: node}
}
return out
panic("GetMany not supported")
}