Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: getPath with carScope #8

Merged
merged 11 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ cli.command('get <cid>')
.describe('Fetch a DAG from the peer. Outputs a CAR file.')
.option('-p, --peer', 'Address of peer to fetch data from.')
.option('-t, --timeout', 'Timeout in milliseconds.', TIMEOUT)
.action(async (cid, { peer, timeout }) => {
cid = CID.parse(cid)
.action(async (cidPath, { peer, timeout }) => {
const [cidStr] = cidPath.replace(/^\/ipfs\//, '').split('/')
const cid = CID.parse(cidStr)
const controller = new TimeoutController(timeout)
const libp2p = await getLibp2p()
const dagula = await fromNetwork(libp2p, { peer, hashers })
Expand All @@ -73,7 +74,7 @@ cli.command('get <cid>')
let error
;(async () => {
try {
for await (const block of dagula.get(cid, { signal: controller.signal })) {
for await (const block of dagula.getPath(cidPath, { signal: controller.signal })) {
controller.reset()
await writer.put(block)
}
Expand Down
14 changes: 14 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,21 @@ export interface Network {
handle: (protocol: string | string[], handler: StreamHandler) => Promise<void>
}

export type CarScope = 'all'|'file'|'block'

export interface CarScopeOptions {
carScope?: CarScope
}

export interface IDagula {
/**
* Get a complete DAG.
*/
get (cid: CID|string, options?: AbortOptions): AsyncIterableIterator<Block>
/**
* Get a DAG for a cid+path
*/
getPath (cidPath: string, options?: AbortOptions & CarScopeOptions): AsyncIterableIterator<Block>
/**
* Get a single block.
*/
Expand All @@ -55,6 +65,10 @@ export declare class Dagula implements IDagula {
* Get a complete DAG.
*/
get (cid: CID|string, options?: AbortOptions): AsyncIterableIterator<Block>
/**
* Get a DAG for a cid+path
*/
getPath (cidPath: string, options?: AbortOptions & CarScopeOptions): AsyncIterableIterator<Block>
/**
* Get a single block.
*/
Expand Down
132 changes: 118 additions & 14 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import debug from 'debug'
import { CID } from 'multiformats/cid'
import * as dagPB from '@ipld/dag-pb'
import * as Block from 'multiformats/block'
import { exporter, walkPath } from 'ipfs-unixfs-exporter'
import { transform } from 'streaming-iterables'
Expand Down Expand Up @@ -29,20 +30,23 @@ export class Dagula {
}

/**
* @param {import('multiformats').CID|string} cid
* @param {{ signal?: AbortSignal }} [options]
* @param {CID[]|CID|string} cid
* @param {object} [options]
* @param {AbortSignal} [options.signal]
* @param {(block: import('multiformats').BlockView) => CID[]} [options.search]
*/
async * get (cid, options = {}) {
cid = typeof cid === 'string' ? CID.parse(cid) : cid
log('getting DAG %s', cid)
let cids = Array.isArray(cid) ? cid : [cid]
const search = options.search || breadthFirstSearch()

/** @type {AbortController[]} */
let aborters = []
const { signal } = options
signal?.addEventListener('abort', () => aborters.forEach(a => a.abort(signal.reason)))

let cids = [cid]
while (true) {
while (cids.length > 0) {
olizilla marked this conversation as resolved.
Show resolved Hide resolved
log('fetching %d CIDs', cids.length)
const fetchBlocks = transform(cids.length, async cid => {
if (signal) {
Expand All @@ -54,7 +58,7 @@ export class Dagula {
}
return this.getBlock(cid)
})
const nextCids = []
let nextCids = []
for await (const { cid, bytes } of fetchBlocks(cids)) {
const decoder = this.#decoders[cid.code]
if (!decoder) {
Expand All @@ -72,16 +76,86 @@ export class Dagula {
// createUnsafe here.
const block = await Block.create({ bytes, cid, codec: decoder, hasher })
yield block
for (const [, cid] of block.links()) {
nextCids.push(cid)
}
nextCids = nextCids.concat(search(block))
}
if (!nextCids.length) break
log('%d CIDs in links', nextCids.length)
cids = nextCids
}
}

/**
* Yield all blocks traversed to resolve the ipfs path.
* Then use carScope to determine the set of blocks of the targeted dag to yield.
* Yield all blocks by default.
* Use carScope: 'block' to yield the termimal block.
* Use carScope: 'file' to yield all the blocks of a unixfs file, or enough blocks to list a directory.
*
* @param {string} cidPath
* @param {object} [options]
* @param {AbortSignal} [options.signal]
* @param {'all'|'file'|'block'} [options.carScope] control how many layers of the dag are returned
* 'all': return the entire dag starting at path. (default)
* 'block': return the block identified by the path.
* 'file': Mimic gateway semantics: Return All blocks for a multi-block file or just enough blocks to enumerate a dir/map but not the dir contents.
* Where path points to a single block file, all three selectors would return the same thing.
* where path points to a sharded hamt: 'file' returns the blocks of the hamt so the dir can be listed. 'block' returns the root block of the hamt.
*/
async * getPath (cidPath, options = {}) {
const carScope = options.carScope ?? 'all'

/**
* The resolved dag root at the terminus of the cidPath
* @type {import('ipfs-unixfs-exporter').UnixFSEntry}
* */
olizilla marked this conversation as resolved.
Show resolved Hide resolved
let base

/**
* Cache of blocks required to resove the cidPath
* @type {import('./index').Block[]} */
olizilla marked this conversation as resolved.
Show resolved Hide resolved
let traversed = []

/**
* Adapter for unixfs-exporter to track the blocks it loads as it resolves the path.
* `walkPath` emits a single unixfs entry for multiblock structures, but we need the individual blocks.
* TODO: port logic to @web3-storage/ipfs-path to make this less ugly.
*/
const blockstore = {
/**
* @param {CID} cid
* @param {{ signal?: AbortSignal }} [options]
*/
get: async (cid, options) => {
const block = await this.getBlock(cid, options)
traversed.push(block)
return block.bytes
}
}
for await (const item of walkPath(cidPath, blockstore, { signal: options.signal })) {
base = item
yield * traversed
traversed = []
}

if (carScope === 'all' || (carScope === 'file' && base.type !== 'directory')) {
const links = base.node.Links?.map(l => l.Hash) || []
// fetch the entire dag rooted at the end of the provided path
if (links.length) {
yield * this.get(links, { signal: options.signal })
}
}
// non-files, like directories, and IPLD Maps only return blocks necessary for their enumeration
if (carScope === 'file' && base.type === 'directory') {
// the single block for the root has already been yielded.
// For a hamt we must fetch all the blocks of the (current) hamt.
if (base.unixfs.type === 'hamt-sharded-directory') {
const hamtLinks = base.node.Links?.filter(l => l.Name.length === 2).map(l => l.Hash) || []
if (hamtLinks.length) {
yield * this.get(hamtLinks, { search: hamtSearch, signal: options.signal })
}
}
}
}

/**
* @param {import('multiformats').CID|string} cid
* @param {{ signal?: AbortSignal }} [options]
Expand Down Expand Up @@ -117,11 +191,11 @@ export class Dagula {
}

/**
* @param {string|import('multiformats').CID} path
* @param {string} cidPath
* @param {{ signal?: AbortSignal }} [options]
*/
async * walkUnixfsPath (path, options = {}) {
log('walking unixfs %s', path)
async * walkUnixfsPath (cidPath, options = {}) {
log('walking unixfs %s', cidPath)
const blockstore = {
/**
* @param {CID} cid
Expand All @@ -132,8 +206,38 @@ export class Dagula {
return block.bytes
}
}
yield * walkPath(cidPath, blockstore, { signal: options.signal })
}
}

// @ts-ignore exporter requires Blockstore but only uses `get`
yield * walkPath(path, blockstore, { signal: options.signal })
/**
* Create a search function that given a decoded Block
* will return an array of CIDs to fetch next.
*
* @param {([name, cid]: [string, Link]) => boolean} linkFilter
*/
export function breadthFirstSearch (linkFilter = () => true) {
/**
* @param {import('multiformats').BlockView} block
*/
return function (block) {
const nextCids = []
if (block.cid.code === dagPB.code) {
for (const { Name, Hash } of block.value.Links) {
if (linkFilter([Name, Hash])) {
nextCids.push(Hash)
}
}
} else {
// links() paths dagPb in the ipld style so name is e.g `Links/0/Hash`, and not what we want here.
for (const link of block.links()) {
if (linkFilter(link)) {
nextCids.push(link[1])
}
}
}
return nextCids
}
}

export const hamtSearch = breadthFirstSearch(([name]) => name.length === 2)
Loading