Skip to content

Commit

Permalink
feat!: updates agent-store api to unblock integration with w3infra (#…
Browse files Browse the repository at this point in the history
…1479)

While working on storacha/w3infra#380 I have
realized that agent-store API was poorly designed because it imposed:

1. Store to traverse agent message in order to index each invocations
and receipts within it and code that dealt with traversal was trapped in
util module here that was not even exported.
2. Store has to encode message into bytes in order to persist it, which
is redundant given that we received message in encoded form.

This PR fixes above limitations by switching store interface from
receiving `AgentMessage` to `ParsedAgentMessage` which wraps
`AgentMessage` along with message bytes and index freeing store from
doing any kind of traversal or encoding.

I also removed legacy code that was left behind by previous PR.
  • Loading branch information
Gozala authored May 30, 2024
1 parent a0abb80 commit 2998a93
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 225 deletions.
29 changes: 23 additions & 6 deletions packages/upload-api/src/lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@ import { createService as createW3sService } from './service.js'
import { createService as createPlanService } from './plan.js'
import { createService as createUsageService } from './usage.js'
import { createService as createFilecoinService } from '@web3-storage/filecoin-api/storefront/service'
import * as AgentMessage from './utils/agent-message.js'

export * from './types.js'
export { AgentMessage }

/**
* @param {Omit<Types.UcantoServerContext, 'validateAuthorization'>} options
* @returns {Agent<Types.Service>}
*/
export const createServer = ({ codec = Legacy.inbound, ...context }) => {
export const createServer = ({ codec = Legacy.inbound, ...options }) => {
const context = {
...options,
...createRevocationChecker(options),
}

const server = Server.create({
...createRevocationChecker(context),
id: context.id,
...context,
codec,
service: createService(context),
catch: (error) => context.errorReporter.catch(error),
Expand Down Expand Up @@ -69,6 +75,7 @@ export const createServer = ({ codec = Legacy.inbound, ...context }) => {
* @template {Types.Tuple<Types.ServiceInvocation<Types.Capability, S>>} I
* @param {Agent<S>} agent
* @param {Types.HTTPRequest<Types.AgentMessage<{ In: Types.InferInvocations<I>, Out: Types.Tuple<Types.Receipt> }>>} request
* @returns {Promise<Types.HTTPResponse<Types.AgentMessage<{ Out: Types.InferReceipts<I, S>, In: Types.Tuple<Types.Invocation> }>>>}
*/
export const handle = async (agent, request) => {
const selection = agent.codec.accept(request)
Expand All @@ -86,7 +93,12 @@ export const handle = async (agent, request) => {
// Save invocation inside agent store so we can find it later. If we fail
// to save it we return 500 as we do not want to run the invocation that
// we are unable to service.
const save = await agent.context.agentStore.messages.write(input)
const save = await agent.context.agentStore.messages.write({
data: input,
source: request,
index: AgentMessage.index(input),
})

if (save.error) {
return {
status: 500,
Expand All @@ -96,8 +108,14 @@ export const handle = async (agent, request) => {
}

const output = await execute(agent, input)
const response = await encoder.encode(output)

const { error } = await agent.context.agentStore.messages.write({
data: output,
source: response,
index: AgentMessage.index(output),
})

const { error } = await agent.context.agentStore.messages.write(output)
// Failure to write a receipt is not something we can recover from. Throwing
// or returning HTTP 500 is also a not a great option because invocation may
// have change state and we would not want to rerun it. Which is why we
Expand All @@ -106,7 +124,6 @@ export const handle = async (agent, request) => {
agent.catch(error)
}

const response = await encoder.encode(output)
return response
}
}
Expand Down
44 changes: 40 additions & 4 deletions packages/upload-api/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type {
Failure,
ServiceMethod,
UCANLink,
Link,
HandlerExecutionError,
Signer,
DID,
Expand All @@ -20,6 +21,13 @@ import type {
AgentMessage,
Invocation,
Receipt,
AgentMessageModel,
UCAN,
Capability,
ReceiptModel,
Variant,
HTTPRequest,
HTTPResponse,
} from '@ucanto/interface'
import type { ProviderInput, ConnectionView } from '@ucanto/server'

Expand Down Expand Up @@ -189,8 +197,7 @@ import { SubscriptionsStorage } from './types/subscriptions.js'
export type { SubscriptionsStorage }
import { UsageStorage } from './types/usage.js'
export type { UsageStorage }
import { StorageGetError, TasksScheduler } from './types/service.js'
export type { TasksScheduler }
import { StorageGetError } from './types/storage.js'
import { AllocationsStorage, BlobsStorage, BlobAddInput } from './types/blob.js'
export type { AllocationsStorage, BlobsStorage, BlobAddInput }
import { IPNIService, IndexServiceContext } from './types/index.js'
Expand All @@ -199,7 +206,7 @@ export type {
IPNIService,
BlobRetriever,
BlobNotFound,
ShardedDAGIndex
ShardedDAGIndex,
} from './types/index.js'

export interface Service extends StorefrontService, W3sService {
Expand Down Expand Up @@ -495,11 +502,40 @@ export interface AgentContext {
* {@link Invocation} and {@link Receipt} lookups.
*/
export interface AgentStore {
messages: Writer<AgentMessage>
messages: Writer<ParsedAgentMessage>
invocations: Accessor<UnknownLink, Invocation>
receipts: Accessor<UnknownLink, Receipt>
}

export type TaskLink = Link

export type InvocationLink = Link<UCAN.UCAN<[Capability]>>
export type ReceiptLink = Link<ReceiptModel>
export type AgentMessageLink = Link<AgentMessageModel<unknown>>

export interface ParsedAgentMessage {
source: HTTPRequest | HTTPResponse
data: AgentMessage
index: Iterable<AgentMessageIndexRecord>
}

export interface InvocationSource {
task: TaskLink
invocation: Invocation
message: AgentMessageLink
}

export interface ReceiptSource {
task: TaskLink
receipt: Receipt
message: AgentMessageLink
}

export type AgentMessageIndexRecord = Variant<{
invocation: InvocationSource
receipt: ReceiptSource
}>

/**
* Read interface for the key value store.
*/
Expand Down
17 changes: 0 additions & 17 deletions packages/upload-api/src/types/service.ts

This file was deleted.

31 changes: 31 additions & 0 deletions packages/upload-api/src/utils/agent-message.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,34 @@ export class Iterator {
export function* iterate(message, options) {
yield* new Iterator(message, options)
}

/**
* @param {API.AgentMessage} message
* @returns {Iterable<API.AgentMessageIndexRecord>}
*/
export const index = function* (message) {
const source = message.root.cid
for (const { receipt, invocation } of iterate(message)) {
if (invocation) {
// TODO: actually derive task CID
const task = invocation.link()
yield {
invocation: {
task,
invocation,
message: source,
},
}
}

if (receipt) {
yield {
receipt: {
task: receipt.ran.link(),
receipt,
message: source,
},
}
}
}
}
4 changes: 3 additions & 1 deletion packages/upload-api/test/handlers/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,9 @@ export const test = {
}

assert.ok(
String(accept.ok.out.error).match(/Blob not found/),
/** @type {{message:string}} */ (accept.ok.out.error).message.match(
/Blob not found/
),
'accept was not successful'
)
},
Expand Down
16 changes: 10 additions & 6 deletions packages/upload-api/test/handlers/ucan.js
Original file line number Diff line number Diff line change
Expand Up @@ -403,17 +403,21 @@ export const test = {
assert.ok(conclude.out.ok)
assert.ok(conclude.out.ok?.time)

assert.deepEqual(
await context.agentStore.invocations.get(invocation.link()),
{ ok: invocation }
)
const stored = await context.agentStore.invocations.get(invocation.link())
assert.equal(stored.ok?.link().toString(), invocation.link().toString())

const storedReceipt = await context.agentStore.receipts.get(
invocation.link()
)
assert.ok(storedReceipt.ok)
assert.deepEqual(storedReceipt.ok?.link(), receipt.link())
assert.deepEqual(storedReceipt.ok?.ran, invocation)
assert.deepEqual(
storedReceipt.ok?.link().toString(),
receipt.link().toString()
)
assert.deepEqual(
storedReceipt.ok?.ran.link().toString(),
invocation.link().toString()
)
},
'ucan/conclude schedules web3.storage/blob/accept if invoked with the http/put receipt':
async (assert, context) => {
Expand Down
42 changes: 25 additions & 17 deletions packages/upload-api/test/storage/agent-store-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import { Console } from '@web3-storage/capabilities'

import { alice, registerSpace } from '../util.js'
import { Message, Receipt } from '@ucanto/core'
import * as CAR from '@ucanto/transport/car'
import { createConcludeInvocation } from '../../src/ucan/conclude.js'
import * as AgentMessage from '../../src/utils/agent-message.js'

/**
* @type {API.Tests}
Expand Down Expand Up @@ -109,7 +111,7 @@ export const test = {
assert.ok(hiReceipt.out.ok)

const storedHi = await context.agentStore.invocations.get(hi.link())
assert.deepEqual(storedHi.ok?.link(), hi.link())
assert.deepEqual(storedHi.ok?.link().toString(), hi.link().toString())

const storedHiReceipt = await context.agentStore.receipts.get(hi.link())
assert.equal(
Expand All @@ -119,11 +121,11 @@ export const test = {

const [byeReceipt, hiReceipt2] = await context.connection.execute(bye, hi)

assert.deepEqual(hiReceipt2.ran.link(), hi.link())
assert.deepEqual(hiReceipt2.ran.link().toString(), hi.link().toString())
assert.ok(byeReceipt.out.ok)

const storedBye = await context.agentStore.invocations.get(bye.link())
assert.deepEqual(storedBye.ok?.link(), bye.link())
assert.deepEqual(storedBye.ok?.link().toString(), bye.link().toString())

const storedByeReceipt = await context.agentStore.receipts.get(bye.link())
assert.equal(
Expand Down Expand Up @@ -160,27 +162,29 @@ export const test = {
receipts: [receipt],
})

const result = await context.agentStore.messages.write(message)
const result = await context.agentStore.messages.write({
data: message,
source: CAR.request.encode(message),
index: AgentMessage.index(message),
})
assert.ok(result.ok)

const storedReceipt = await context.agentStore.receipts.get(
receipt.ran.link()
)
assert.deepEqual(
storedReceipt.ok?.link(),
receipt.link(),
storedReceipt.ok?.link().toString(),
receipt.link().toString(),
'receipt was stored and indexed by invocation'
)

const storedInvocation = await context.agentStore.invocations.get(
receipt.ran.link()
)

console.log(storedInvocation)

assert.deepEqual(
storedInvocation.ok?.link(),
hi.link(),
storedInvocation.ok?.link().toString(),
hi.link().toString(),
'invocation was stored and indexed by invocation'
)
},
Expand Down Expand Up @@ -213,15 +217,19 @@ export const test = {
invocations: [conclude],
})

const result = await context.agentStore.messages.write(message)
const result = await context.agentStore.messages.write({
data: message,
source: CAR.request.encode(message),
index: AgentMessage.index(message),
})
assert.ok(result.ok)

const storedReceipt = await context.agentStore.receipts.get(
receipt.ran.link()
)
assert.deepEqual(
storedReceipt.ok?.link(),
receipt.link(),
storedReceipt.ok?.link().toString(),
receipt.link().toString(),
'receipt was stored and indexed by invocation'
)

Expand All @@ -230,8 +238,8 @@ export const test = {
)

assert.deepEqual(
storedInvocation.ok?.link(),
hi.link(),
storedInvocation.ok?.link().toString(),
hi.link().toString(),
'invocation was stored and indexed by invocation'
)

Expand All @@ -240,8 +248,8 @@ export const test = {
)

assert.deepEqual(
storedConclude.ok?.link(),
conclude.link(),
storedConclude.ok?.link().toString(),
conclude.link().toString(),
'store conclude invocation was stored and indexed by invocation'
)
},
Expand Down
Loading

0 comments on commit 2998a93

Please sign in to comment.