@@ -376,7 +376,7 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
376376 defer close (linkResults )
377377 defer cancel ()
378378
379- err := parallelWalkDepth (ctx , ds , ds .dserv , func (formattedLink * ipld.Link ) error {
379+ err := parallelShardWalk (ctx , ds , ds .dserv , func (formattedLink * ipld.Link ) error {
380380 emitResult (ctx , linkResults , format.LinkResult {Link : formattedLink , Err : nil })
381381 return nil
382382 })
@@ -387,13 +387,13 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
387387 return linkResults
388388}
389389
390- type listCidShardUnion struct {
391- links []cid.Cid
390+ type listCidsAndShards struct {
391+ cids []cid.Cid
392392 shards []* Shard
393393}
394394
395- func (ds * Shard ) walkLinks (processLinkValues func (formattedLink * ipld.Link ) error ) (* listCidShardUnion , error ) {
396- res := & listCidShardUnion {}
395+ func (ds * Shard ) walkChildren (processLinkValues func (formattedLink * ipld.Link ) error ) (* listCidsAndShards , error ) {
396+ res := & listCidsAndShards {}
397397
398398 for idx , lnk := range ds .childer .links {
399399 if nextShard := ds .childer .children [idx ]; nextShard == nil {
@@ -415,7 +415,7 @@ func (ds *Shard) walkLinks(processLinkValues func(formattedLink *ipld.Link) erro
415415 return nil , err
416416 }
417417 case shardLink :
418- res .links = append (res .links , lnk .Cid )
418+ res .cids = append (res .cids , lnk .Cid )
419419 default :
420420 return nil , fmt .Errorf ("unsupported shard link type" )
421421 }
@@ -438,7 +438,14 @@ func (ds *Shard) walkLinks(processLinkValues func(formattedLink *ipld.Link) erro
438438 return res , nil
439439}
440440
441- func parallelWalkDepth (ctx context.Context , root * Shard , dserv ipld.DAGService , processShardValues func (formattedLink * ipld.Link ) error ) error {
441+ // parallelShardWalk is quite similar to the DAG walking algorithm from https://github.com/ipfs/go-merkledag/blob/594e515f162e764183243b72c2ba84f743424c8c/merkledag.go#L464
442+ // However, there are a few notable differences:
443+ // 1. Some children are actualized Shard structs and some are in the blockstore, this will leverage walking over the in memory Shards as well as the stored blocks
444+ // 2. Instead of just passing each child into the worker pool by itself we group them so that we can leverage optimizations from GetMany.
445+ // This optimization also makes the walk a little more biased towards depth (as opposed to BFS) in the earlier part of the DAG.
446+ // This is particularly helpful for operations like estimating the directory size which should complete quickly when possible.
447+ // 3. None of the extra options from that package are needed
448+ func parallelShardWalk (ctx context.Context , root * Shard , dserv ipld.DAGService , processShardValues func (formattedLink * ipld.Link ) error ) error {
442449 const concurrency = 32
443450
444451 var visitlk sync.Mutex
@@ -449,36 +456,36 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService,
449456 grp , errGrpCtx := errgroup .WithContext (ctx )
450457
451458 // Input and output queues for workers.
452- feed := make (chan * listCidShardUnion )
453- out := make (chan * listCidShardUnion )
459+ feed := make (chan * listCidsAndShards )
460+ out := make (chan * listCidsAndShards )
454461 done := make (chan struct {})
455462
456463 for i := 0 ; i < concurrency ; i ++ {
457464 grp .Go (func () error {
458- for shardOrCID := range feed {
459- for _ , nextShard := range shardOrCID .shards {
460- nextLinks , err := nextShard .walkLinks (processShardValues )
465+ for feedChildren := range feed {
466+ for _ , nextShard := range feedChildren .shards {
467+ nextChildren , err := nextShard .walkChildren (processShardValues )
461468 if err != nil {
462469 return err
463470 }
464471
465472 select {
466- case out <- nextLinks :
473+ case out <- nextChildren :
467474 case <- errGrpCtx .Done ():
468475 return nil
469476 }
470477 }
471478
472479 var linksToVisit []cid.Cid
473- for _ , nextLink := range shardOrCID . links {
480+ for _ , nextCid := range feedChildren . cids {
474481 var shouldVisit bool
475482
476483 visitlk .Lock ()
477- shouldVisit = visit (nextLink )
484+ shouldVisit = visit (nextCid )
478485 visitlk .Unlock ()
479486
480487 if shouldVisit {
481- linksToVisit = append (linksToVisit , nextLink )
488+ linksToVisit = append (linksToVisit , nextCid )
482489 }
483490 }
484491
@@ -493,13 +500,13 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService,
493500 return err
494501 }
495502
496- nextLinks , err := nextShard .walkLinks (processShardValues )
503+ nextChildren , err := nextShard .walkChildren (processShardValues )
497504 if err != nil {
498505 return err
499506 }
500507
501508 select {
502- case out <- nextLinks :
509+ case out <- nextChildren :
503510 case <- errGrpCtx .Done ():
504511 return nil
505512 }
@@ -515,10 +522,10 @@ func parallelWalkDepth(ctx context.Context, root *Shard, dserv ipld.DAGService,
515522 }
516523
517524 send := feed
518- var todoQueue []* listCidShardUnion
525+ var todoQueue []* listCidsAndShards
519526 var inProgress int
520527
521- next := & listCidShardUnion {
528+ next := & listCidsAndShards {
522529 shards : []* Shard {root },
523530 }
524531
0 commit comments