Skip to content

Commit 81a501c

Browse files
committed
feat: handle retrieval queries for unindexed identity payload CIDs
There are valid cases where a CAR may have an identity CID as its root that is not represented as a 'block' within the CAR body and therefore isn't indexed by the dagstore. In this case, we inspect the identity CID content and treat the query as a query for the intersection of all of the links within the block. Ref: filecoin-project/boost#715
1 parent 10e86a0 commit 81a501c

File tree

1 file changed

+68
-2
lines changed

1 file changed

+68
-2
lines changed

retrievalmarket/impl/provider.go

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import (
1010
"github.com/ipfs/go-cid"
1111
"github.com/ipfs/go-datastore"
1212
"github.com/ipfs/go-datastore/namespace"
13+
"github.com/ipld/go-ipld-prime"
14+
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
15+
"github.com/ipld/go-ipld-prime/traversal"
16+
"github.com/multiformats/go-multihash"
1317
"golang.org/x/xerrors"
1418

1519
"github.com/filecoin-project/go-address"
@@ -341,7 +345,7 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) {
341345
pieceInfo, isUnsealed, err := p.getPieceInfoFromCid(ctx, query.PayloadCID, pieceCID)
342346
if err != nil {
343347
log.Errorf("Retrieval query: getPieceInfoFromCid: %s", err)
344-
if !xerrors.Is(err, retrievalmarket.ErrNotFound) {
348+
if !errors.Is(err, retrievalmarket.ErrNotFound) {
345349
answer.Status = retrievalmarket.QueryResponseError
346350
answer.Message = fmt.Sprintf("failed to fetch piece to retrieve from: %s", err)
347351
} else {
@@ -397,7 +401,15 @@ func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, clientPi
397401
// Get all pieces that contain the target block
398402
piecesWithTargetBlock, err := p.dagStore.GetPiecesContainingBlock(payloadCID)
399403
if err != nil {
400-
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("getting pieces for cid %s: %w", payloadCID, err)
404+
// this payloadCID may be an identity CID that's in the root of a CAR but
405+
// not recorded in the index
406+
piecesWithTargetBlock, err := p.getCommonPiecesFromIdentityCidLinks(ctx, payloadCID)
407+
if err != nil {
408+
return piecestore.PieceInfoUndefined, false, err
409+
}
410+
if len(piecesWithTargetBlock) == 0 {
411+
return piecestore.PieceInfoUndefined, false, fmt.Errorf("getting pieces for cid %s: %w", payloadCID, err)
412+
}
401413
}
402414

403415
// For each piece that contains the target block
@@ -448,6 +460,60 @@ func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, clientPi
448460
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("could not locate piece: %w", lastErr)
449461
}
450462

463+
// getCommonPiecesFromIdentityCidLinks will inspect a payloadCID and if it has an identity multihash,
464+
// will determine which pieces contain all of the links within the decoded identity multihash block
465+
func (p *Provider) getCommonPiecesFromIdentityCidLinks(ctx context.Context, payloadCID cid.Cid) ([]cid.Cid, error) {
466+
if payloadCID.Prefix().MhType != multihash.IDENTITY {
467+
return nil, nil
468+
}
469+
470+
// decode the identity multihash, if possible (i.e. it's valid and we have the right codec loaded)
471+
decoder, err := cidlink.DefaultLinkSystem().DecoderChooser(cidlink.Link{Cid: payloadCID})
472+
if err != nil {
473+
return nil, fmt.Errorf("choosing decoder for identity CID %s: %w", payloadCID, err)
474+
}
475+
mh, err := multihash.Decode(payloadCID.Hash())
476+
if err != nil {
477+
return nil, fmt.Errorf("decoding identity CID multihash %s: %w", payloadCID, err)
478+
}
479+
node, err := ipld.Decode(mh.Digest, decoder)
480+
if err != nil {
481+
return nil, fmt.Errorf("decoding identity CID %s: %w", payloadCID, err)
482+
}
483+
links, err := traversal.SelectLinks(node)
484+
if err != nil {
485+
return nil, fmt.Errorf("collecting links from identity CID %s: %w", payloadCID, err)
486+
}
487+
488+
pieces := make([]cid.Cid, 0)
489+
// for each link, query the dagstore for pieces that contain it
490+
for i, link_ := range links {
491+
link := link_.(cidlink.Link).Cid
492+
piecesWithThisCid, err := p.dagStore.GetPiecesContainingBlock(link)
493+
if len(piecesWithThisCid) == 0 {
494+
return nil, fmt.Errorf("getting pieces for identity CID sub-link %s: %w", link, err)
495+
}
496+
if i == 0 {
497+
pieces = append(pieces, piecesWithThisCid...)
498+
} else {
499+
// after the first, find the intersection between these pieces and the previous ones
500+
intersection := make([]cid.Cid, 0)
501+
for _, cj := range piecesWithThisCid {
502+
for _, ck := range pieces {
503+
if cj.Equals(ck) {
504+
intersection = append(intersection, cj)
505+
break
506+
}
507+
}
508+
}
509+
pieces = intersection
510+
}
511+
if len(pieces) == 0 {
512+
break
513+
}
514+
}
515+
return pieces, nil
516+
}
451517
func (p *Provider) pieceInUnsealedSector(ctx context.Context, pieceInfo piecestore.PieceInfo) bool {
452518
for _, di := range pieceInfo.Deals {
453519
isUnsealed, err := p.sa.IsUnsealed(ctx, di.SectorID, di.Offset.Unpadded(), di.Length.Unpadded())

0 commit comments

Comments
 (0)