Skip to content

Commit

Permalink
feat!: add index/add handler (#1421)
Browse files Browse the repository at this point in the history
Adds a Ucanto handler for `index/add` invocations.

* Fetches index archive from the network
* Ensures index and all DAG shards are stored in the agent's space
* Publishes to IPNI

refs #1401

---------

Co-authored-by: Irakli Gozalishvili <contact@gozala.io>
  • Loading branch information
Alan Shaw and Gozala authored May 1, 2024
1 parent f8c6c1d commit cbe9524
Show file tree
Hide file tree
Showing 24 changed files with 828 additions and 19 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/upload-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
- name: Setup
uses: actions/setup-node@v3
with:
node-version: 16
node-version: 18
registry-url: https://registry.npmjs.org/
cache: 'pnpm'

Expand Down Expand Up @@ -67,7 +67,7 @@ jobs:
- name: Setup
uses: actions/setup-node@v3
with:
node-version: 16
node-version: 18
registry-url: https://registry.npmjs.org/
cache: 'pnpm'

Expand Down
16 changes: 15 additions & 1 deletion packages/capabilities/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -459,16 +459,30 @@ export type IndexAdd = InferInvokedCapability<typeof IndexCaps.add>
export type IndexAddSuccess = Unit

export type IndexAddFailure =
| IndexNotFound
| DecodeFailure
| UnknownFormat
| ShardNotFound
| SliceNotFound
| Failure

/** The index is not in a format understood by the service. */
/** An error occurred when decoding the data. */
export interface DecodeFailure extends Failure {
name: 'DecodeFailure'
}

/** The data is not in a format understood by the service. */
export interface UnknownFormat extends Failure {
name: 'UnknownFormat'
}

/** The index is not stored in the referenced space. */
export interface IndexNotFound extends Failure {
name: 'IndexNotFound'
/** Multihash digest of the index that could not be found. */
digest: Multihash
}

/** A shard referenced by the index is not stored in the referenced space. */
export interface ShardNotFound extends Failure {
name: 'ShardNotFound'
Expand Down
6 changes: 4 additions & 2 deletions packages/upload-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@
"test-watch": "pnpm build && mocha --bail --timeout 10s --watch --parallel -n no-warnings -n experimental-vm-modules -n experimental-fetch --watch-files src,test"
},
"dependencies": {
"@ipld/dag-cbor": "^9.0.6",
"@ucanto/client": "^9.0.1",
"@ucanto/interface": "^10.0.1",
"@ucanto/principal": "^9.0.1",
Expand All @@ -189,9 +190,10 @@
"@web3-storage/content-claims": "^4.0.4",
"@web3-storage/did-mailto": "workspace:^",
"@web3-storage/filecoin-api": "workspace:^",
"carstream": "^2.1.0",
"multiformats": "^12.1.2",
"uint8arrays": "^5.0.3",
"p-retry": "^5.1.2"
"p-retry": "^5.1.2",
"uint8arrays": "^5.0.3"
},
"devDependencies": {
"@ipld/car": "^5.1.1",
Expand Down
21 changes: 21 additions & 0 deletions packages/upload-api/src/blob/lib.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Failure } from '@ucanto/server'
import { base58btc } from 'multiformats/bases/base58'

export const AllocatedMemoryHadNotBeenWrittenToName =
'AllocatedMemoryHadNotBeenWrittenTo'
Expand Down Expand Up @@ -69,3 +70,23 @@ export class AwaitError extends Failure {
}
}
}

export class BlobNotFound extends Failure {
static name = /** @type {const} */ ('BlobNotFound')
#digest

/** @param {import('multiformats').MultihashDigest} digest */
constructor(digest) {
super()
this.#digest = digest
}
describe() {
return `blob not found: ${base58btc.encode(this.#digest.bytes)}`
}
get name() {
return BlobNotFound.name
}
get digest () {
return this.#digest.bytes
}
}
5 changes: 5 additions & 0 deletions packages/upload-api/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { provide } from './index/add.js'
import * as API from './types.js'

/** @param {API.IndexServiceContext} context */
export const createService = (context) => ({ add: provide(context) })
78 changes: 78 additions & 0 deletions packages/upload-api/src/index/add.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import * as Server from '@ucanto/server'
import { ok, error } from '@ucanto/server'
import * as Index from '@web3-storage/capabilities/index'
import * as ShardedDAGIndex from './lib/sharded-dag-index.js'
import * as API from '../types.js'

/**
* @param {API.IndexServiceContext} context
* @returns {API.ServiceMethod<API.IndexAdd, API.IndexAddSuccess, API.IndexAddFailure>}
*/
export const provide = (context) =>
Server.provide(Index.add, (input) => add(input, context))

/**
* @param {API.Input<Index.add>} input
* @param {API.IndexServiceContext} context
* @returns {Promise<API.Result<API.IndexAddSuccess, API.IndexAddFailure>>}
*/
const add = async ({ capability }, context) => {
const space = capability.with
const idxLink = capability.nb.index

// ensure the index was stored in the agent's space
const idxAllocRes = await assertAllocated(
context,
space,
idxLink.multihash,
'IndexNotFound'
)
if (!idxAllocRes.ok) return idxAllocRes

// fetch the index from the network
const idxBlobRes = await context.blobRetriever.stream(idxLink.multihash)
if (!idxBlobRes.ok) {
if (idxBlobRes.error.name === 'BlobNotFound') {
return error(
/** @type {API.IndexNotFound} */
({ name: 'IndexNotFound', digest: idxLink.multihash.bytes })
)
}
return idxBlobRes
}

const idxRes = await ShardedDAGIndex.extract(idxBlobRes.ok)
if (!idxRes.ok) return idxAllocRes

// ensure indexed shards are allocated in the agent's space
const shardDigests = [...idxRes.ok.shards.keys()]
const shardAllocRes = await Promise.all(
shardDigests.map((s) => assertAllocated(context, space, s, 'ShardNotFound'))
)
for (const res of shardAllocRes) {
if (!res.ok) return res
}

// TODO: randomly validate slices in the index correspond to slices in the blob

// publish the index data to IPNI
return context.ipniService.publish(idxRes.ok)
}

/**
* @param {{ allocationsStorage: import('../types.js').AllocationsStorage }} context
* @param {API.SpaceDID} space
* @param {import('multiformats').MultihashDigest} digest
* @param {'IndexNotFound'|'ShardNotFound'|'SliceNotFound'} errorName
* @returns {Promise<API.Result<API.Unit, API.IndexNotFound|API.ShardNotFound|API.SliceNotFound|API.Failure>>}
*/
const assertAllocated = async (context, space, digest, errorName) => {
const result = await context.allocationsStorage.exists(space, digest.bytes)
if (result.error) return result
if (!result.ok)
return error(
/** @type {API.IndexNotFound|API.ShardNotFound|API.SliceNotFound} */
({ name: errorName, digest: digest.bytes })
)
return ok({})
}
1 change: 1 addition & 0 deletions packages/upload-api/src/index/lib/api.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export {}
23 changes: 23 additions & 0 deletions packages/upload-api/src/index/lib/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Failure } from '@ucanto/interface'
import { MultihashDigest, UnknownLink } from 'multiformats'

export type { Result } from '@ucanto/interface'
export type { UnknownFormat } from '@web3-storage/capabilities/types'
export type { MultihashDigest, UnknownLink }

export type ShardDigest = MultihashDigest
export type SliceDigest = MultihashDigest

/**
* A sharded DAG index.
*
* @see https://github.com/w3s-project/specs/blob/main/w3-index.md
*/
export interface ShardedDAGIndex {
content: UnknownLink
shards: Map<ShardDigest, Map<SliceDigest, [offset: number, length: number]>>
}

export interface DecodeFailure extends Failure {
name: 'DecodeFailure'
}
116 changes: 116 additions & 0 deletions packages/upload-api/src/index/lib/digest-map.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { base58btc } from 'multiformats/bases/base58'

/** @type {WeakMap<Uint8Array, string>} */
const cache = new WeakMap()

/** @param {import('multiformats').MultihashDigest} digest */
const toBase58String = (digest) => {
let str = cache.get(digest.bytes)
if (!str) {
str = base58btc.encode(digest.bytes)
cache.set(digest.bytes, str)
}
return str
}

/**
* @template {import('multiformats').MultihashDigest} Key
* @template Value
* @implements {Map<Key, Value>}
*/
export class DigestMap {
/** @type {Map<string, [Key, Value]>} */
#data

/**
* @param {Array<[Key, Value]>} [entries]
*/
constructor(entries) {
this.#data = new Map()
for (const [k, v] of entries ?? []) {
this.set(k, v)
}
}

get [Symbol.toStringTag]() {
return 'DigestMap'
}

clear() {
this.#data.clear()
}

/**
* @param {Key} key
* @returns {boolean}
*/
delete(key) {
const mhstr = toBase58String(key)
return this.#data.delete(mhstr)
}

/**
* @param {(value: Value, key: Key, map: Map<Key, Value>) => void} callbackfn
* @param {any} [thisArg]
*/
forEach(callbackfn, thisArg) {
for (const [k, v] of this.#data.values()) {
callbackfn.call(thisArg, v, k, this)
}
}

/**
* @param {Key} key
* @returns {Value|undefined}
*/
get(key) {
const data = this.#data.get(toBase58String(key))
if (data) return data[1]
}

/**
* @param {Key} key
* @returns {boolean}
*/
has(key) {
return this.#data.has(toBase58String(key))
}

/**
* @param {Key} key
* @param {Value} value
*/
set(key, value) {
this.#data.set(toBase58String(key), [key, value])
return this
}

/** @returns {number} */
get size() {
return this.#data.size
}

/** @returns */
[Symbol.iterator]() {
return this.entries()
}

/** @returns {IterableIterator<[Key, Value]>} */
*entries() {
yield* this.#data.values()
}

/** @returns {IterableIterator<Key>} */
*keys() {
for (const [k] of this.#data.values()) {
yield k
}
}

/** @returns {IterableIterator<Value>} */
*values() {
for (const [, v] of this.#data.values()) {
yield v
}
}
}
Loading

0 comments on commit cbe9524

Please sign in to comment.