diff --git a/packages/upload-api/src/lib.js b/packages/upload-api/src/lib.js index 615ce690d..14d90884e 100644 --- a/packages/upload-api/src/lib.js +++ b/packages/upload-api/src/lib.js @@ -24,17 +24,23 @@ import { createService as createW3sService } from './service.js' import { createService as createPlanService } from './plan.js' import { createService as createUsageService } from './usage.js' import { createService as createFilecoinService } from '@web3-storage/filecoin-api/storefront/service' +import * as AgentMessage from './utils/agent-message.js' export * from './types.js' +export { AgentMessage } /** * @param {Omit} options * @returns {Agent} */ -export const createServer = ({ codec = Legacy.inbound, ...context }) => { +export const createServer = ({ codec = Legacy.inbound, ...options }) => { + const context = { + ...options, + ...createRevocationChecker(options), + } + const server = Server.create({ - ...createRevocationChecker(context), - id: context.id, + ...context, codec, service: createService(context), catch: (error) => context.errorReporter.catch(error), @@ -69,6 +75,7 @@ export const createServer = ({ codec = Legacy.inbound, ...context }) => { * @template {Types.Tuple>} I * @param {Agent} agent * @param {Types.HTTPRequest, Out: Types.Tuple }>>} request + * @returns {Promise, In: Types.Tuple }>>>} */ export const handle = async (agent, request) => { const selection = agent.codec.accept(request) @@ -86,7 +93,12 @@ export const handle = async (agent, request) => { // Save invocation inside agent store so we can find it later. If we fail // to save it we return 500 as we do not want to run the invocation that // we are unable to service. - const save = await agent.context.agentStore.messages.write(input) + const save = await agent.context.agentStore.messages.write({ + data: input, + source: request, + index: AgentMessage.index(input), + }) + if (save.error) { return { status: 500, @@ -96,8 +108,14 @@ export const handle = async (agent, request) => { } const output = await execute(agent, input) + const response = await encoder.encode(output) + + const { error } = await agent.context.agentStore.messages.write({ + data: output, + source: response, + index: AgentMessage.index(output), + }) - const { error } = await agent.context.agentStore.messages.write(output) // Failure to write a receipt is not something we can recover from. Throwing // or returning HTTP 500 is also a not a great option because invocation may // have change state and we would not want to rerun it. Which is why we @@ -106,7 +124,6 @@ export const handle = async (agent, request) => { agent.catch(error) } - const response = await encoder.encode(output) return response } } diff --git a/packages/upload-api/src/types.ts b/packages/upload-api/src/types.ts index e785e88bd..8f4e0e288 100644 --- a/packages/upload-api/src/types.ts +++ b/packages/upload-api/src/types.ts @@ -2,6 +2,7 @@ import type { Failure, ServiceMethod, UCANLink, + Link, HandlerExecutionError, Signer, DID, @@ -20,6 +21,13 @@ import type { AgentMessage, Invocation, Receipt, + AgentMessageModel, + UCAN, + Capability, + ReceiptModel, + Variant, + HTTPRequest, + HTTPResponse, } from '@ucanto/interface' import type { ProviderInput, ConnectionView } from '@ucanto/server' @@ -189,8 +197,7 @@ import { SubscriptionsStorage } from './types/subscriptions.js' export type { SubscriptionsStorage } import { UsageStorage } from './types/usage.js' export type { UsageStorage } -import { StorageGetError, TasksScheduler } from './types/service.js' -export type { TasksScheduler } +import { StorageGetError } from './types/storage.js' import { AllocationsStorage, BlobsStorage, BlobAddInput } from './types/blob.js' export type { AllocationsStorage, BlobsStorage, BlobAddInput } import { IPNIService, IndexServiceContext } from './types/index.js' @@ -199,7 +206,7 @@ export type { IPNIService, BlobRetriever, BlobNotFound, - ShardedDAGIndex + ShardedDAGIndex, } from './types/index.js' export interface Service extends StorefrontService, W3sService { @@ -495,11 +502,40 @@ export interface AgentContext { * {@link Invocation} and {@link Receipt} lookups. */ export interface AgentStore { - messages: Writer + messages: Writer invocations: Accessor receipts: Accessor } +export type TaskLink = Link + +export type InvocationLink = Link> +export type ReceiptLink = Link +export type AgentMessageLink = Link> + +export interface ParsedAgentMessage { + source: HTTPRequest | HTTPResponse + data: AgentMessage + index: Iterable +} + +export interface InvocationSource { + task: TaskLink + invocation: Invocation + message: AgentMessageLink +} + +export interface ReceiptSource { + task: TaskLink + receipt: Receipt + message: AgentMessageLink +} + +export type AgentMessageIndexRecord = Variant<{ + invocation: InvocationSource + receipt: ReceiptSource +}> + /** * Read interface for the key value store. */ diff --git a/packages/upload-api/src/types/service.ts b/packages/upload-api/src/types/service.ts deleted file mode 100644 index e0609464d..000000000 --- a/packages/upload-api/src/types/service.ts +++ /dev/null @@ -1,17 +0,0 @@ -import type { - UnknownLink, - Receipt, - Result, - Unit, - Failure, - ServiceInvocation, -} from '@ucanto/interface' -import type { Storage, StorageGetError, StoragePutError } from './storage.js' - -export type { StorageGetError, StoragePutError } - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export type ReceiptsStorage = Storage> -export interface TasksScheduler { - schedule: (invocation: ServiceInvocation) => Promise> -} diff --git a/packages/upload-api/src/utils/agent-message.js b/packages/upload-api/src/utils/agent-message.js index f7685839f..17906b080 100644 --- a/packages/upload-api/src/utils/agent-message.js +++ b/packages/upload-api/src/utils/agent-message.js @@ -118,3 +118,34 @@ export class Iterator { export function* iterate(message, options) { yield* new Iterator(message, options) } + +/** + * @param {API.AgentMessage} message + * @returns {Iterable} + */ +export const index = function* (message) { + const source = message.root.cid + for (const { receipt, invocation } of iterate(message)) { + if (invocation) { + // TODO: actually derive task CID + const task = invocation.link() + yield { + invocation: { + task, + invocation, + message: source, + }, + } + } + + if (receipt) { + yield { + receipt: { + task: receipt.ran.link(), + receipt, + message: source, + }, + } + } + } +} diff --git a/packages/upload-api/test/handlers/blob.js b/packages/upload-api/test/handlers/blob.js index b78098403..33da67d0a 100644 --- a/packages/upload-api/test/handlers/blob.js +++ b/packages/upload-api/test/handlers/blob.js @@ -409,7 +409,9 @@ export const test = { } assert.ok( - String(accept.ok.out.error).match(/Blob not found/), + /** @type {{message:string}} */ (accept.ok.out.error).message.match( + /Blob not found/ + ), 'accept was not successful' ) }, diff --git a/packages/upload-api/test/handlers/ucan.js b/packages/upload-api/test/handlers/ucan.js index cc47b5665..d0e952839 100644 --- a/packages/upload-api/test/handlers/ucan.js +++ b/packages/upload-api/test/handlers/ucan.js @@ -403,17 +403,21 @@ export const test = { assert.ok(conclude.out.ok) assert.ok(conclude.out.ok?.time) - assert.deepEqual( - await context.agentStore.invocations.get(invocation.link()), - { ok: invocation } - ) + const stored = await context.agentStore.invocations.get(invocation.link()) + assert.equal(stored.ok?.link().toString(), invocation.link().toString()) const storedReceipt = await context.agentStore.receipts.get( invocation.link() ) assert.ok(storedReceipt.ok) - assert.deepEqual(storedReceipt.ok?.link(), receipt.link()) - assert.deepEqual(storedReceipt.ok?.ran, invocation) + assert.deepEqual( + storedReceipt.ok?.link().toString(), + receipt.link().toString() + ) + assert.deepEqual( + storedReceipt.ok?.ran.link().toString(), + invocation.link().toString() + ) }, 'ucan/conclude schedules web3.storage/blob/accept if invoked with the http/put receipt': async (assert, context) => { diff --git a/packages/upload-api/test/storage/agent-store-tests.js b/packages/upload-api/test/storage/agent-store-tests.js index f3aeafc32..0dda643e9 100644 --- a/packages/upload-api/test/storage/agent-store-tests.js +++ b/packages/upload-api/test/storage/agent-store-tests.js @@ -6,7 +6,9 @@ import { Console } from '@web3-storage/capabilities' import { alice, registerSpace } from '../util.js' import { Message, Receipt } from '@ucanto/core' +import * as CAR from '@ucanto/transport/car' import { createConcludeInvocation } from '../../src/ucan/conclude.js' +import * as AgentMessage from '../../src/utils/agent-message.js' /** * @type {API.Tests} @@ -109,7 +111,7 @@ export const test = { assert.ok(hiReceipt.out.ok) const storedHi = await context.agentStore.invocations.get(hi.link()) - assert.deepEqual(storedHi.ok?.link(), hi.link()) + assert.deepEqual(storedHi.ok?.link().toString(), hi.link().toString()) const storedHiReceipt = await context.agentStore.receipts.get(hi.link()) assert.equal( @@ -119,11 +121,11 @@ export const test = { const [byeReceipt, hiReceipt2] = await context.connection.execute(bye, hi) - assert.deepEqual(hiReceipt2.ran.link(), hi.link()) + assert.deepEqual(hiReceipt2.ran.link().toString(), hi.link().toString()) assert.ok(byeReceipt.out.ok) const storedBye = await context.agentStore.invocations.get(bye.link()) - assert.deepEqual(storedBye.ok?.link(), bye.link()) + assert.deepEqual(storedBye.ok?.link().toString(), bye.link().toString()) const storedByeReceipt = await context.agentStore.receipts.get(bye.link()) assert.equal( @@ -160,15 +162,19 @@ export const test = { receipts: [receipt], }) - const result = await context.agentStore.messages.write(message) + const result = await context.agentStore.messages.write({ + data: message, + source: CAR.request.encode(message), + index: AgentMessage.index(message), + }) assert.ok(result.ok) const storedReceipt = await context.agentStore.receipts.get( receipt.ran.link() ) assert.deepEqual( - storedReceipt.ok?.link(), - receipt.link(), + storedReceipt.ok?.link().toString(), + receipt.link().toString(), 'receipt was stored and indexed by invocation' ) @@ -176,11 +182,9 @@ export const test = { receipt.ran.link() ) - console.log(storedInvocation) - assert.deepEqual( - storedInvocation.ok?.link(), - hi.link(), + storedInvocation.ok?.link().toString(), + hi.link().toString(), 'invocation was stored and indexed by invocation' ) }, @@ -213,15 +217,19 @@ export const test = { invocations: [conclude], }) - const result = await context.agentStore.messages.write(message) + const result = await context.agentStore.messages.write({ + data: message, + source: CAR.request.encode(message), + index: AgentMessage.index(message), + }) assert.ok(result.ok) const storedReceipt = await context.agentStore.receipts.get( receipt.ran.link() ) assert.deepEqual( - storedReceipt.ok?.link(), - receipt.link(), + storedReceipt.ok?.link().toString(), + receipt.link().toString(), 'receipt was stored and indexed by invocation' ) @@ -230,8 +238,8 @@ export const test = { ) assert.deepEqual( - storedInvocation.ok?.link(), - hi.link(), + storedInvocation.ok?.link().toString(), + hi.link().toString(), 'invocation was stored and indexed by invocation' ) @@ -240,8 +248,8 @@ export const test = { ) assert.deepEqual( - storedConclude.ok?.link(), - conclude.link(), + storedConclude.ok?.link().toString(), + conclude.link().toString(), 'store conclude invocation was stored and indexed by invocation' ) }, diff --git a/packages/upload-api/test/storage/agent-store.js b/packages/upload-api/test/storage/agent-store.js index 9e5b146ae..42ed60137 100644 --- a/packages/upload-api/test/storage/agent-store.js +++ b/packages/upload-api/test/storage/agent-store.js @@ -1,68 +1,105 @@ import * as API from '../../src/types.js' -import { TasksStorage } from './tasks-storage.js' -import { ReceiptsStorage } from './receipts-storage.js' -import * as AgentMessage from '../../src/utils/agent-message.js' +import { CAR, Invocation, Receipt } from '@ucanto/core' +import { RecordNotFound } from '../../src/errors.js' export const memory = () => new AgentStore() /** + * @typedef {object} Model + * @property {Record} store + * @property {Record} index + * * @implements {API.AgentStore} */ class AgentStore { - constructor() { - this.invocations = new TasksStorage() - this.receipts = new ReceiptsStorage() + /** + * @param {Partial} [model] + */ + constructor({ + store = Object.create(null), + index = Object.create(null), + } = {}) { + const model = { store, index } + this.model = model + + this.invocations = new InvocationLookup(model) + this.receipts = new ReceiptLookup(model) } get messages() { return this } /** - * @param {API.AgentMessage} message - * @returns {Promise>>} + * @param {API.ParsedAgentMessage} message + * @returns {Promise>>} */ async write(message) { - const promises = [] - for (const { invocation, receipt } of AgentMessage.iterate(message)) { + const { index, store } = this.model + const at = message.data.root.cid.toString() + store[at] = CAR.decode(/** @type {Uint8Array} */ (message.source.body)) + + for (const { invocation, receipt } of message.index) { if (invocation) { - promises.push(this.invocations.put(invocation)) + let entry = index[`/${invocation.task.toString()}/invocation/`] ?? [] + entry.push({ root: invocation.invocation.link(), at }) + index[`/${invocation.task.toString()}/invocation/`] = entry } + if (receipt) { - promises.push(this.receipts.put(receipt)) + let entry = index[`/${receipt.task.toString()}/receipt/`] ?? [] + entry.push({ root: receipt.receipt.link(), at }) + index[`/${receipt.task.toString()}/receipt/`] = entry } } - const results = await Promise.all(promises) - const failure = results.find((result) => result.error) + return { ok: {} } + } +} - return failure?.error - ? { - error: new WriteError({ - payload: message, - writer: this, - cause: failure.error, - }), - } - : { ok: {} } +class InvocationLookup { + /** + * @param {Model} model + */ + constructor(model) { + this.model = model + } + /** + * + * @param {API.UnknownLink} key + * @returns {Promise>} + */ + async get(key) { + const { index, store } = this.model + const record = index[`/${key.toString()}/invocation/`]?.[0] + const archive = record ? store[record.at] : null + const value = archive + ? Invocation.view({ root: record.root, blocks: archive.blocks }, null) + : null + + return value ? { ok: value } : { error: new RecordNotFound() } } } -/** - * @template T - * @implements {API.WriteError} - */ -class WriteError extends Error { - name = /** @type {const} */ ('WriteError') +class ReceiptLookup { /** - * @param {object} input - * @param {Error} input.cause - * @param {T} input.payload - * @param {API.Writer} input.writer + * @param {Model} model */ - constructor({ cause, payload, writer }) { - super(`Write to store has failed: ${cause}`) - this.cause = cause - this.payload = payload - this.writer = writer + constructor(model) { + this.model = model + } + /** + * + * @param {API.UnknownLink} key + * @returns {Promise>} + */ + async get(key) { + const { index, store } = this.model + const record = index[`/${key.toString()}/receipt/`]?.[0] + const archive = record ? store[record.at] : null + const value = archive + ? Receipt.view({ root: record.root, blocks: archive.blocks }, null) + : null + + return value ? { ok: value } : { error: new RecordNotFound() } } } diff --git a/packages/upload-api/test/storage/receipts-storage.js b/packages/upload-api/test/storage/receipts-storage.js deleted file mode 100644 index 3bcc5f6fe..000000000 --- a/packages/upload-api/test/storage/receipts-storage.js +++ /dev/null @@ -1,64 +0,0 @@ -import * as API from '../../src/types.js' - -import { RecordNotFound } from '../../src/errors.js' - -/** - * @typedef {import('@web3-storage/capabilities/types').StorageGetError} StorageGetError - * @typedef {import('@web3-storage/capabilities/types').StoragePutError} StoragePutError - * @typedef {import('@ucanto/interface').UnknownLink} UnknownLink - * @typedef {import('@ucanto/interface').Receipt} Receipt - */ - -/** - * @implements {API.Accessor} - */ -export class ReceiptsStorage { - constructor() { - /** @type {Map} */ - this.items = new Map() - } - - /** - * @param {Receipt} record - * @returns {Promise>} - */ - async put(record) { - this.items.set(record.ran.link().toString(), record) - - return Promise.resolve({ - ok: {}, - }) - } - - /** - * @param {UnknownLink} link - * @returns {Promise>} - */ - async get(link) { - const record = this.items.get(link.toString()) - if (!record) { - return { - error: new RecordNotFound('not found'), - } - } - return { - ok: record, - } - } - - /** - * @param {UnknownLink} link - * @returns {Promise>} - */ - async has(link) { - const record = this.items.get(link.toString()) - if (!record) { - return { - ok: false, - } - } - return { - ok: Boolean(record), - } - } -} diff --git a/packages/upload-api/test/storage/tasks-storage.js b/packages/upload-api/test/storage/tasks-storage.js deleted file mode 100644 index 82efa82a3..000000000 --- a/packages/upload-api/test/storage/tasks-storage.js +++ /dev/null @@ -1,72 +0,0 @@ -import * as API from '../../src/types.js' - -import { RecordNotFound } from '../../src/errors.js' - -/** - * @typedef {import('@web3-storage/capabilities/types').StorageGetError} StorageGetError - * @typedef {import('@web3-storage/capabilities/types').StoragePutError} StoragePutError - * @typedef {import('@ucanto/interface').UnknownLink} UnknownLink - * @typedef {import('@ucanto/interface').Invocation} Invocation - */ - -/** - * @implements {API.Accessor} - */ -export class TasksStorage { - constructor() { - /** @type {Map} */ - this.items = new Map() - } - - /** - * @param {Invocation} record - * @returns {Promise>} - */ - async put(record) { - this.items.set(record.cid.toString(), record) - - // TODO: store implementation - // const archiveDelegationRes = await task.archive() - // if (archiveDelegationRes.error) { - // return { - // error: archiveDelegationRes.error - // } - // } - - return Promise.resolve({ - ok: {}, - }) - } - - /** - * @param {UnknownLink} link - * @returns {Promise>} - */ - async get(link) { - const record = this.items.get(link.toString()) - if (!record) { - return { - error: new RecordNotFound(), - } - } - return { - ok: record, - } - } - - /** - * @param {UnknownLink} link - * @returns {Promise>} - */ - async has(link) { - const record = this.items.get(link.toString()) - if (!record) { - return { - ok: false, - } - } - return { - ok: Boolean(record), - } - } -}