Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: iterable pinning #231

Merged
merged 25 commits into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e09b945
feat!: update the interfaces for batching and iteration.
saul-jb Aug 13, 2023
23dc18c
feat!: update pins add to be interable and batchable.
saul-jb Aug 13, 2023
5cf4ca1
chore: remove redundant p-defer.
saul-jb Aug 13, 2023
9795cc4
test: update to new implementation.
saul-jb Aug 13, 2023
4c217f7
feat!: update pins rm to be iterable
saul-jb Aug 14, 2023
7c5e895
feat!: update pins rm to be interable.
saul-jb Aug 14, 2023
2df091b
chore: remove redundant p-queue package.
saul-jb Aug 14, 2023
b5818ae
test: update tests to the new pin rm signature.
saul-jb Aug 14, 2023
a01738c
test: update interop test to new interfaces.
saul-jb Aug 14, 2023
417ed31
chore: update benchmark scripts
saul-jb Aug 14, 2023
d8a9a21
chore: linting
saul-jb Aug 14, 2023
1d36b37
feat: add support for changing batch size dynamically
saul-jb Aug 14, 2023
c97774f
test: fix interop pins test
saul-jb Aug 14, 2023
bc38555
fix: make the default batch number a reasonable value
saul-jb Aug 14, 2023
16929e0
feat: add option for syncing local values first
saul-jb Aug 14, 2023
ad35c6b
refactor: change to standard iterables
saul-jb Aug 16, 2023
82c13d6
refactor: change map to parallel
saul-jb Aug 16, 2023
394500d
fix: remove last type from add generator
saul-jb Aug 16, 2023
952db49
Merge branch 'main' into feat/iterable-pin
saul-jb Aug 16, 2023
9111576
Merge branch 'main' into feat/iterable-pin
achingbrain Sep 14, 2023
dd25d1e
Merge branch 'main' into feat/iterable-pin
achingbrain Sep 14, 2023
0a8f5ba
Merge branch 'main' into feat/iterable-pin
SgtPooki Dec 14, 2023
7f9dbb5
test(pins): use generator syntax
SgtPooki Dec 14, 2023
3490a4f
fix: support resumable pinning
SgtPooki Dec 14, 2023
8034239
chore: fix test
achingbrain Jan 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/add-dir/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"@helia/unixfs": "^1.4.0",
"@ipld/dag-pb": "^4.0.2",
"@libp2p/websockets": "^8.0.9",
"aegir": "^41.3.5",
"aegir": "^42.0.0",
"blockstore-fs": "^1.0.1",
"datastore-level": "^10.0.1",
"execa": "^8.0.1",
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/gc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
},
"devDependencies": {
"@ipld/dag-pb": "^4.0.6",
"aegir": "^41.2.0",
"aegir": "^42.0.0",
"blockstore-fs": "^1.1.8",
"datastore-level": "^10.1.5",
"execa": "^8.0.1",
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/gc/src/helia.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export async function createHeliaBenchmark (): Promise<GcBenchmark> {
await drain(helia.blockstore.putMany(map(blocks, ({ key, value }) => ({ cid: key, block: value }))))
},
async pin (cid) {
await helia.pins.add(cid)
await all(helia.pins.add(cid))
},
async teardown () {
await helia.stop()
Expand All @@ -52,7 +52,7 @@ export async function createHeliaBenchmark (): Promise<GcBenchmark> {
const pins = await all(helia.pins.ls())

for (const pin of pins) {
await helia.pins.rm(pin.cid)
await all(helia.pins.rm(pin.cid))
}

return pins.length
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"docs:no-publish": "aegir docs --publish false"
},
"devDependencies": {
"aegir": "^41.0.0",
"aegir": "^42.0.0",
"npm-run-all": "^4.1.5",
"typedoc-plugin-mdn-links": "^3.0.3"
},
Expand Down
9 changes: 4 additions & 5 deletions packages/helia/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@
"@libp2p/interface": "^1.0.1",
"@libp2p/kad-dht": "^12.0.1",
"@libp2p/keychain": "^4.0.2",
"@libp2p/logger": "^4.0.1",
"@libp2p/mdns": "^10.0.2",
"@libp2p/mplex": "^10.0.2",
"@libp2p/ping": "^1.0.1",
"@libp2p/tcp": "^9.0.2",
"@libp2p/upnp-nat": "^1.0.1",
"@libp2p/utils": "^5.2.0",
"@libp2p/webrtc": "^4.0.3",
"@libp2p/websockets": "^8.0.2",
"@libp2p/webtransport": "^4.0.3",
Expand All @@ -111,25 +113,22 @@
"interface-store": "^5.0.1",
"ipfs-bitswap": "^20.0.0",
"ipns": "^8.0.0",
"it-all": "^3.0.2",
"it-drain": "^3.0.1",
"it-filter": "^3.0.1",
"it-foreach": "^2.0.2",
"libp2p": "^1.0.3",
"mortice": "^3.0.1",
"multiformats": "^13.0.0",
"p-defer": "^4.0.0",
"p-queue": "^8.0.1",
"progress-events": "^1.0.0",
"uint8arrays": "^5.0.0"
},
"devDependencies": {
"@libp2p/logger": "^4.0.1",
"@multiformats/mafmt": "^12.1.5",
"@multiformats/multiaddr": "^12.1.7",
"@types/sinon": "^17.0.2",
"aegir": "^41.0.0",
"aegir": "^42.0.0",
"delay": "^6.0.0",
"it-all": "^3.0.4",
"sinon": "^17.0.0",
"sinon-ts": "^2.0.0"
},
Expand Down
2 changes: 1 addition & 1 deletion packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export * from '@helia/interface/pins'
*/
export interface DAGWalker {
codec: number
walk(block: Uint8Array): AsyncGenerator<CID, void, undefined>
walk(block: Uint8Array): Generator<CID, void, undefined>
}

/**
Expand Down
99 changes: 48 additions & 51 deletions packages/helia/src/pins.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { Queue } from '@libp2p/utils/queue'
import * as cborg from 'cborg'
import { type Datastore, Key } from 'interface-datastore'
import { base36 } from 'multiformats/bases/base36'
import { CID, type Version } from 'multiformats/cid'
import defer from 'p-defer'
import PQueue from 'p-queue'
import { CustomProgressEvent, type ProgressOptions } from 'progress-events'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import { dagCborWalker, dagJsonWalker, dagPbWalker, jsonWalker, rawWalker } from './utils/dag-walkers.js'
import type { DAGWalker } from './index.js'
import type { DAGWalker, GetBlockProgressEvents } from './index.js'
import type { AddOptions, AddPinEvents, IsPinnedOptions, LsOptions, Pin, Pins, RmOptions } from '@helia/interface/pins'
import type { GetBlockProgressEvents } from '@helia/interface/src/blocks.js'
import type { AbortOptions } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'

Expand Down Expand Up @@ -38,10 +36,20 @@ interface DatastorePinnedBlock {
pinnedBy: Uint8Array[]
}

/**
* Callback for updating a {@link DatastorePinnedBlock}'s properties when
* calling `#updatePinnedBlock`
*
* The callback should return `false` to prevent any pinning modifications to
* the block, and true in all other cases.
*/
interface WithPinnedBlockCallback {
(pinnedBlock: DatastorePinnedBlock): boolean
}
Comment on lines +39 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achingbrain I pulled this out into a locally defined interface and added some context


const DATASTORE_PIN_PREFIX = '/pin/'
const DATASTORE_BLOCK_PREFIX = '/pinned-block/'
const DATASTORE_ENCODING = base36
// const DAG_WALK_MAX_QUEUE_LENGTH = 10
const DAG_WALK_QUEUE_CONCURRENCY = 1

interface WalkDagOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents | AddPinEvents> {
Expand Down Expand Up @@ -71,7 +79,7 @@ export class PinsImpl implements Pins {
})
}

async add (cid: CID<unknown, number, number, Version>, options: AddOptions = {}): Promise<Pin> {
async * add (cid: CID<unknown, number, number, Version>, options: AddOptions = {}): AsyncGenerator<CID, void, undefined> {
const pinKey = toDSKey(cid)

if (await this.datastore.has(pinKey)) {
Expand All @@ -85,56 +93,40 @@ export class PinsImpl implements Pins {
}

// use a queue to walk the DAG instead of recursion so we can traverse very large DAGs
const queue = new PQueue({
const queue = new Queue<AsyncGenerator<CID>>({
concurrency: DAG_WALK_QUEUE_CONCURRENCY
})
void queue.add(async (): Promise<void> => {
await this.#walkDag(cid, queue, (pinnedBlock): void => {

for await (const childCid of this.#walkDag(cid, queue, {
...options,
depth
})) {
await this.#updatePinnedBlock(childCid, (pinnedBlock: DatastorePinnedBlock) => {
// do not update pinned block if this block is already pinned by this CID
if (pinnedBlock.pinnedBy.find(c => uint8ArrayEquals(c, cid.bytes)) != null) {
return
return false
}

pinnedBlock.pinCount++
pinnedBlock.pinnedBy.push(cid.bytes)
}, {
...options,
depth
})
})

// if a job in the queue errors, throw that error
const deferred = defer()

queue.on('error', (err): void => {
queue.clear()
deferred.reject(err)
})
return true
}, options)

// wait for the queue to complete or error
await Promise.race([
queue.onIdle(),
deferred.promise
])
yield childCid
}

const pin: DatastorePin = {
depth,
metadata: options.metadata ?? {}
}

await this.datastore.put(pinKey, cborg.encode(pin), options)

return {
cid,
...pin
}
}

/**
* Walk the DAG behind the passed CID, ensure all blocks are present in the blockstore
* and update the pin count for them
* Walk a DAG in an iterable fashion
*/
async #walkDag (cid: CID, queue: PQueue, withPinnedBlock: (pinnedBlock: DatastorePinnedBlock) => void, options: WalkDagOptions): Promise<void> {
async * #walkDag (cid: CID, queue: Queue<AsyncGenerator<CID>>, options: WalkDagOptions): AsyncGenerator<CID> {
if (options.depth === -1) {
return
}
Expand All @@ -147,12 +139,12 @@ export class PinsImpl implements Pins {

const block = await this.blockstore.get(cid, options)

await this.#updatePinnedBlock(cid, withPinnedBlock, options)
yield cid

// walk dag, ensure all blocks are present
for await (const cid of dagWalker.walk(block)) {
void queue.add(async () => {
await this.#walkDag(cid, queue, withPinnedBlock, {
yield * await queue.add(async () => {
return this.#walkDag(cid, queue, {
...options,
depth: options.depth - 1
})
Expand All @@ -163,7 +155,7 @@ export class PinsImpl implements Pins {
/**
* Update the pin count for the CID
*/
async #updatePinnedBlock (cid: CID, withPinnedBlock: (pinnedBlock: DatastorePinnedBlock) => void, options: AddOptions): Promise<void> {
async #updatePinnedBlock (cid: CID, withPinnedBlock: WithPinnedBlockCallback, options: AddOptions): Promise<void> {
const blockKey = new Key(`${DATASTORE_BLOCK_PREFIX}${DATASTORE_ENCODING.encode(cid.multihash.bytes)}`)

let pinnedBlock: DatastorePinnedBlock = {
Expand All @@ -179,7 +171,11 @@ export class PinsImpl implements Pins {
}
}

withPinnedBlock(pinnedBlock)
const shouldContinue = withPinnedBlock(pinnedBlock)

if (!shouldContinue) {
return
}

if (pinnedBlock.pinCount === 0) {
if (await this.datastore.has(blockKey)) {
Expand All @@ -189,34 +185,35 @@ export class PinsImpl implements Pins {
}

await this.datastore.put(blockKey, cborg.encode(pinnedBlock), options)
options.onProgress?.(new CustomProgressEvent<CID>('helia:pin:add', { detail: cid }))
options.onProgress?.(new CustomProgressEvent<CID>('helia:pin:add', cid))
}

async rm (cid: CID<unknown, number, number, Version>, options: RmOptions = {}): Promise<Pin> {
async * rm (cid: CID<unknown, number, number, Version>, options: RmOptions = {}): AsyncGenerator<CID, void, undefined> {
const pinKey = toDSKey(cid)
const buf = await this.datastore.get(pinKey, options)
const pin = cborg.decode(buf)

await this.datastore.delete(pinKey, options)

// use a queue to walk the DAG instead of recursion so we can traverse very large DAGs
const queue = new PQueue({
const queue = new Queue<AsyncGenerator<CID>>({
concurrency: DAG_WALK_QUEUE_CONCURRENCY
})
void queue.add(async (): Promise<void> => {
await this.#walkDag(cid, queue, (pinnedBlock): void => {

for await (const childCid of this.#walkDag(cid, queue, {
...options,
depth: pin.depth
})) {
await this.#updatePinnedBlock(childCid, (pinnedBlock): boolean => {
pinnedBlock.pinCount--
pinnedBlock.pinnedBy = pinnedBlock.pinnedBy.filter(c => uint8ArrayEquals(c, cid.bytes))
return true
}, {
...options,
depth: pin.depth
})
})
await queue.onIdle()

return {
cid,
...pin
yield childCid
}
}

Expand Down
10 changes: 5 additions & 5 deletions packages/helia/src/utils/dag-walkers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import type { DAGWalker } from '../index.js'
*/
export const dagPbWalker: DAGWalker = {
codec: dagPb.code,
async * walk (block) {
* walk (block) {
const node = dagPb.decode(block)

yield * node.Links.map(l => l.Hash)
Expand All @@ -29,7 +29,7 @@ export const dagPbWalker: DAGWalker = {
*/
export const rawWalker: DAGWalker = {
codec: raw.code,
async * walk () {
* walk () {
// no embedded CIDs in a raw block
}
}
Expand All @@ -44,7 +44,7 @@ const CID_TAG = 42
*/
export const dagCborWalker: DAGWalker = {
codec: dagCbor.code,
async * walk (block) {
* walk (block) {
const cids: CID[] = []
const tags: cborg.TagDecoder[] = []
tags[CID_TAG] = (bytes) => {
Expand Down Expand Up @@ -142,7 +142,7 @@ class DagJsonTokenizer extends cborgJson.Tokenizer {
*/
export const dagJsonWalker: DAGWalker = {
codec: dagJson.code,
async * walk (block) {
* walk (block) {
const cids: CID[] = []
const tags: cborg.TagDecoder[] = []
tags[CID_TAG] = (string) => {
Expand Down Expand Up @@ -177,5 +177,5 @@ export const dagJsonWalker: DAGWalker = {
*/
export const jsonWalker: DAGWalker = {
codec: json.code,
async * walk () {}
* walk () {}
}
2 changes: 1 addition & 1 deletion packages/helia/test/fixtures/dag-walker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { DAGWalker } from '../../src/index.js'
export function dagWalker (codec: number, dag: Record<string, DAGNode>): DAGWalker {
return {
codec,
async * walk (block) {
* walk (block) {
const node = dag[uint8ArrayToString(block)] ?? { links: [] }

yield * node.links
Expand Down
Loading