Skip to content

Commit

Permalink
feat!: iterable pinning (#231)
Browse files Browse the repository at this point in the history
Refactors `helia.pin.add` and `helia.pin.rm` to return async generators instead of a promise.

Each value yielded from the generator is a CID that has either been pinned or removed.

BREAKING CHANGE: `helia.pin.add` and `helia.pin.rm` now return `AsyncGenerator<CID>`

---------

Co-authored-by: Alex Potsides <alex@achingbrain.net>
Co-authored-by: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 7, 2024
1 parent 17e85f9 commit c15c774
Show file tree
Hide file tree
Showing 17 changed files with 190 additions and 120 deletions.
2 changes: 1 addition & 1 deletion benchmarks/add-dir/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"@helia/unixfs": "^1.4.0",
"@ipld/dag-pb": "^4.0.2",
"@libp2p/websockets": "^8.0.9",
"aegir": "^41.3.5",
"aegir": "^42.0.0",
"blockstore-fs": "^1.0.1",
"datastore-level": "^10.0.1",
"execa": "^8.0.1",
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/gc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
},
"devDependencies": {
"@ipld/dag-pb": "^4.0.6",
"aegir": "^41.2.0",
"aegir": "^42.0.0",
"blockstore-fs": "^1.1.8",
"datastore-level": "^10.1.5",
"execa": "^8.0.1",
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/gc/src/helia.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export async function createHeliaBenchmark (): Promise<GcBenchmark> {
await drain(helia.blockstore.putMany(map(blocks, ({ key, value }) => ({ cid: key, block: value }))))
},
async pin (cid) {
await helia.pins.add(cid)
await all(helia.pins.add(cid))
},
async teardown () {
await helia.stop()
Expand All @@ -52,7 +52,7 @@ export async function createHeliaBenchmark (): Promise<GcBenchmark> {
const pins = await all(helia.pins.ls())

for (const pin of pins) {
await helia.pins.rm(pin.cid)
await all(helia.pins.rm(pin.cid))
}

return pins.length
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"docs:no-publish": "aegir docs --publish false"
},
"devDependencies": {
"aegir": "^41.0.0",
"aegir": "^42.0.0",
"npm-run-all": "^4.1.5",
"typedoc-plugin-mdn-links": "^3.0.3"
},
Expand Down
9 changes: 4 additions & 5 deletions packages/helia/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@
"@libp2p/interface": "^1.0.1",
"@libp2p/kad-dht": "^12.0.1",
"@libp2p/keychain": "^4.0.2",
"@libp2p/logger": "^4.0.1",
"@libp2p/mdns": "^10.0.2",
"@libp2p/mplex": "^10.0.2",
"@libp2p/ping": "^1.0.1",
"@libp2p/tcp": "^9.0.2",
"@libp2p/upnp-nat": "^1.0.1",
"@libp2p/utils": "^5.2.0",
"@libp2p/webrtc": "^4.0.3",
"@libp2p/websockets": "^8.0.2",
"@libp2p/webtransport": "^4.0.3",
Expand All @@ -111,25 +113,22 @@
"interface-store": "^5.0.1",
"ipfs-bitswap": "^20.0.0",
"ipns": "^8.0.0",
"it-all": "^3.0.2",
"it-drain": "^3.0.1",
"it-filter": "^3.0.1",
"it-foreach": "^2.0.2",
"libp2p": "^1.0.3",
"mortice": "^3.0.1",
"multiformats": "^13.0.0",
"p-defer": "^4.0.0",
"p-queue": "^8.0.1",
"progress-events": "^1.0.0",
"uint8arrays": "^5.0.0"
},
"devDependencies": {
"@libp2p/logger": "^4.0.1",
"@multiformats/mafmt": "^12.1.5",
"@multiformats/multiaddr": "^12.1.7",
"@types/sinon": "^17.0.2",
"aegir": "^41.0.0",
"aegir": "^42.0.0",
"delay": "^6.0.0",
"it-all": "^3.0.4",
"sinon": "^17.0.0",
"sinon-ts": "^2.0.0"
},
Expand Down
2 changes: 1 addition & 1 deletion packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export * from '@helia/interface/pins'
*/
export interface DAGWalker {
codec: number
walk(block: Uint8Array): AsyncGenerator<CID, void, undefined>
walk(block: Uint8Array): Generator<CID, void, undefined>
}

/**
Expand Down
99 changes: 48 additions & 51 deletions packages/helia/src/pins.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { Queue } from '@libp2p/utils/queue'
import * as cborg from 'cborg'
import { type Datastore, Key } from 'interface-datastore'
import { base36 } from 'multiformats/bases/base36'
import { CID, type Version } from 'multiformats/cid'
import defer from 'p-defer'
import PQueue from 'p-queue'
import { CustomProgressEvent, type ProgressOptions } from 'progress-events'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import { dagCborWalker, dagJsonWalker, dagPbWalker, jsonWalker, rawWalker } from './utils/dag-walkers.js'
import type { DAGWalker } from './index.js'
import type { DAGWalker, GetBlockProgressEvents } from './index.js'
import type { AddOptions, AddPinEvents, IsPinnedOptions, LsOptions, Pin, Pins, RmOptions } from '@helia/interface/pins'
import type { GetBlockProgressEvents } from '@helia/interface/src/blocks.js'
import type { AbortOptions } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'

Expand Down Expand Up @@ -38,10 +36,20 @@ interface DatastorePinnedBlock {
pinnedBy: Uint8Array[]
}

/**
* Callback for updating a {@link DatastorePinnedBlock}'s properties when
* calling `#updatePinnedBlock`
*
* The callback should return `false` to prevent any pinning modifications to
* the block, and true in all other cases.
*/
interface WithPinnedBlockCallback {
(pinnedBlock: DatastorePinnedBlock): boolean
}

const DATASTORE_PIN_PREFIX = '/pin/'
const DATASTORE_BLOCK_PREFIX = '/pinned-block/'
const DATASTORE_ENCODING = base36
// const DAG_WALK_MAX_QUEUE_LENGTH = 10
const DAG_WALK_QUEUE_CONCURRENCY = 1

interface WalkDagOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents | AddPinEvents> {
Expand Down Expand Up @@ -71,7 +79,7 @@ export class PinsImpl implements Pins {
})
}

async add (cid: CID<unknown, number, number, Version>, options: AddOptions = {}): Promise<Pin> {
async * add (cid: CID<unknown, number, number, Version>, options: AddOptions = {}): AsyncGenerator<CID, void, undefined> {
const pinKey = toDSKey(cid)

if (await this.datastore.has(pinKey)) {
Expand All @@ -85,56 +93,40 @@ export class PinsImpl implements Pins {
}

// use a queue to walk the DAG instead of recursion so we can traverse very large DAGs
const queue = new PQueue({
const queue = new Queue<AsyncGenerator<CID>>({
concurrency: DAG_WALK_QUEUE_CONCURRENCY
})
void queue.add(async (): Promise<void> => {
await this.#walkDag(cid, queue, (pinnedBlock): void => {

for await (const childCid of this.#walkDag(cid, queue, {
...options,
depth
})) {
await this.#updatePinnedBlock(childCid, (pinnedBlock: DatastorePinnedBlock) => {
// do not update pinned block if this block is already pinned by this CID
if (pinnedBlock.pinnedBy.find(c => uint8ArrayEquals(c, cid.bytes)) != null) {
return
return false
}

pinnedBlock.pinCount++
pinnedBlock.pinnedBy.push(cid.bytes)
}, {
...options,
depth
})
})

// if a job in the queue errors, throw that error
const deferred = defer()

queue.on('error', (err): void => {
queue.clear()
deferred.reject(err)
})
return true
}, options)

// wait for the queue to complete or error
await Promise.race([
queue.onIdle(),
deferred.promise
])
yield childCid
}

const pin: DatastorePin = {
depth,
metadata: options.metadata ?? {}
}

await this.datastore.put(pinKey, cborg.encode(pin), options)

return {
cid,
...pin
}
}

/**
* Walk the DAG behind the passed CID, ensure all blocks are present in the blockstore
* and update the pin count for them
* Walk a DAG in an iterable fashion
*/
async #walkDag (cid: CID, queue: PQueue, withPinnedBlock: (pinnedBlock: DatastorePinnedBlock) => void, options: WalkDagOptions): Promise<void> {
async * #walkDag (cid: CID, queue: Queue<AsyncGenerator<CID>>, options: WalkDagOptions): AsyncGenerator<CID> {
if (options.depth === -1) {
return
}
Expand All @@ -147,12 +139,12 @@ export class PinsImpl implements Pins {

const block = await this.blockstore.get(cid, options)

await this.#updatePinnedBlock(cid, withPinnedBlock, options)
yield cid

// walk dag, ensure all blocks are present
for await (const cid of dagWalker.walk(block)) {
void queue.add(async () => {
await this.#walkDag(cid, queue, withPinnedBlock, {
yield * await queue.add(async () => {
return this.#walkDag(cid, queue, {
...options,
depth: options.depth - 1
})
Expand All @@ -163,7 +155,7 @@ export class PinsImpl implements Pins {
/**
* Update the pin count for the CID
*/
async #updatePinnedBlock (cid: CID, withPinnedBlock: (pinnedBlock: DatastorePinnedBlock) => void, options: AddOptions): Promise<void> {
async #updatePinnedBlock (cid: CID, withPinnedBlock: WithPinnedBlockCallback, options: AddOptions): Promise<void> {
const blockKey = new Key(`${DATASTORE_BLOCK_PREFIX}${DATASTORE_ENCODING.encode(cid.multihash.bytes)}`)

let pinnedBlock: DatastorePinnedBlock = {
Expand All @@ -179,7 +171,11 @@ export class PinsImpl implements Pins {
}
}

withPinnedBlock(pinnedBlock)
const shouldContinue = withPinnedBlock(pinnedBlock)

if (!shouldContinue) {
return
}

if (pinnedBlock.pinCount === 0) {
if (await this.datastore.has(blockKey)) {
Expand All @@ -189,34 +185,35 @@ export class PinsImpl implements Pins {
}

await this.datastore.put(blockKey, cborg.encode(pinnedBlock), options)
options.onProgress?.(new CustomProgressEvent<CID>('helia:pin:add', { detail: cid }))
options.onProgress?.(new CustomProgressEvent<CID>('helia:pin:add', cid))
}

async rm (cid: CID<unknown, number, number, Version>, options: RmOptions = {}): Promise<Pin> {
async * rm (cid: CID<unknown, number, number, Version>, options: RmOptions = {}): AsyncGenerator<CID, void, undefined> {
const pinKey = toDSKey(cid)
const buf = await this.datastore.get(pinKey, options)
const pin = cborg.decode(buf)

await this.datastore.delete(pinKey, options)

// use a queue to walk the DAG instead of recursion so we can traverse very large DAGs
const queue = new PQueue({
const queue = new Queue<AsyncGenerator<CID>>({
concurrency: DAG_WALK_QUEUE_CONCURRENCY
})
void queue.add(async (): Promise<void> => {
await this.#walkDag(cid, queue, (pinnedBlock): void => {

for await (const childCid of this.#walkDag(cid, queue, {
...options,
depth: pin.depth
})) {
await this.#updatePinnedBlock(childCid, (pinnedBlock): boolean => {
pinnedBlock.pinCount--
pinnedBlock.pinnedBy = pinnedBlock.pinnedBy.filter(c => uint8ArrayEquals(c, cid.bytes))
return true
}, {
...options,
depth: pin.depth
})
})
await queue.onIdle()

return {
cid,
...pin
yield childCid
}
}

Expand Down
10 changes: 5 additions & 5 deletions packages/helia/src/utils/dag-walkers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import type { DAGWalker } from '../index.js'
*/
export const dagPbWalker: DAGWalker = {
codec: dagPb.code,
async * walk (block) {
* walk (block) {
const node = dagPb.decode(block)

yield * node.Links.map(l => l.Hash)
Expand All @@ -29,7 +29,7 @@ export const dagPbWalker: DAGWalker = {
*/
export const rawWalker: DAGWalker = {
codec: raw.code,
async * walk () {
* walk () {
// no embedded CIDs in a raw block
}
}
Expand All @@ -44,7 +44,7 @@ const CID_TAG = 42
*/
export const dagCborWalker: DAGWalker = {
codec: dagCbor.code,
async * walk (block) {
* walk (block) {
const cids: CID[] = []
const tags: cborg.TagDecoder[] = []
tags[CID_TAG] = (bytes) => {
Expand Down Expand Up @@ -142,7 +142,7 @@ class DagJsonTokenizer extends cborgJson.Tokenizer {
*/
export const dagJsonWalker: DAGWalker = {
codec: dagJson.code,
async * walk (block) {
* walk (block) {
const cids: CID[] = []
const tags: cborg.TagDecoder[] = []
tags[CID_TAG] = (string) => {
Expand Down Expand Up @@ -177,5 +177,5 @@ export const dagJsonWalker: DAGWalker = {
*/
export const jsonWalker: DAGWalker = {
codec: json.code,
async * walk () {}
* walk () {}
}
2 changes: 1 addition & 1 deletion packages/helia/test/fixtures/dag-walker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { DAGWalker } from '../../src/index.js'
export function dagWalker (codec: number, dag: Record<string, DAGNode>): DAGWalker {
return {
codec,
async * walk (block) {
* walk (block) {
const node = dag[uint8ArrayToString(block)] ?? { links: [] }

yield * node.links
Expand Down
Loading

0 comments on commit c15c774

Please sign in to comment.