Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: agent store integration #380

Merged
merged 30 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
832aa35
feat: implement agent store
Gozala May 17, 2024
2a63f73
feat: implement agent store
Gozala May 17, 2024
1221297
feat: attempt to refactor
Gozala May 25, 2024
1056f29
Merge remote-tracking branch 'origin/main' into feat/agent-store-inte…
Gozala May 25, 2024
28fee19
chore: undo unintended changes
Gozala May 25, 2024
db6290a
chore: undo
Gozala May 25, 2024
a668e4e
chore: undo
Gozala May 25, 2024
008638b
chore: undo
Gozala May 25, 2024
7fdc252
fix: cause of test failures
Gozala May 28, 2024
02a4c93
chore: use dev release of upload-api
Gozala May 28, 2024
9c6d257
chore: remove local deps
Gozala May 28, 2024
b77ace4
chore: undo unintended changes
Gozala May 28, 2024
2dd9940
fix: resolve dependency incompatibilities
Gozala May 28, 2024
16d3142
chore: undo formatting changes
Gozala May 29, 2024
3fa1a88
chore: undo changes
Gozala May 29, 2024
8dae31c
chore: revert all the breakpoints and line breaks as they were
Gozala May 29, 2024
80fbbf0
chore: remove read-pipeline.tldr
Gozala May 29, 2024
d1ae1d7
chore: remove ucan endpoint
Gozala May 29, 2024
91591ce
feat: restore ucan-invocation endpoint
Gozala May 29, 2024
c9bc074
chore: enable ucan-invocation tests
Gozala May 29, 2024
2cf48ed
chore: remove obsolete bindings
Gozala May 29, 2024
58c4b50
fix: agent store integration tests (#381)
vasco-santos May 29, 2024
2e727c7
chore: refactor path resolution logic
Gozala May 29, 2024
05d441e
fix: typo
Gozala May 29, 2024
2440550
chore: expose region field
Gozala May 29, 2024
708f03a
fix: fix receipt endpoint
Gozala May 29, 2024
24f5193
Merge branch 'feat/agent-store-integration-v2' into feat/agent-store-…
Gozala May 29, 2024
34f29f9
fix: use of receipts & invocations by filecoin-api
Gozala May 29, 2024
6306a8f
fix: last tests
vasco-santos May 30, 2024
c53bc03
fix: add back invocation cid
vasco-santos May 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions filecoin/store/invocation.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {
S3Client,
ListObjectsV2Command,
} from '@aws-sdk/client-s3'
import { S3Client } from '@aws-sdk/client-s3'
import { parseLink } from '@ucanto/core'
import * as Store from '../../upload-api/stores/agent/store.js'

/**
* Abstraction layer with Factory to perform operations on bucket storing
Expand All @@ -25,28 +24,26 @@ export function createInvocationStore(region, bucketName, options = {}) {
* @returns {import('../types').InvocationBucket}
*/
export const useInvocationStore = (s3client, bucketName) => {
const store = Store.open({
connection: { channel: s3client },
region: typeof s3client.config.region === 'string' ? s3client.config.region : 'us-west-2',
buckets: {
index: { name: bucketName },
message: { name: bucketName },
}
})

return {
/**
* Get the agent message file CID for an invocation.
*
* @param {string} invocationCid
*/
getInLink: async (invocationCid) => {
const prefix = `${invocationCid}/`
// Multiple entries may match the key prefix. Picking an arbitrary one is fine given
// can receive same invocations in multiple CAR files.
const listObjectCmd = new ListObjectsV2Command({
Bucket: bucketName,
Prefix: prefix,
})
const listObject = await s3client.send(listObjectCmd)
const carEntry = listObject.Contents?.find(
content => content.Key?.endsWith('.in')
)
if (!carEntry) {
return
const result = await Store.resolve(store, { invocation: parseLink(invocationCid) })
if (result.ok) {
return result.ok.message.toString()
}
return carEntry.Key?.replace(prefix, '').replace('.in', '')
},
}
}
110 changes: 18 additions & 92 deletions filecoin/store/receipt.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import {
S3Client,
GetObjectCommand,
ListObjectsV2Command
} from '@aws-sdk/client-s3'
import * as CAR from '@ucanto/transport/car'
import { StoreOperationFailed, RecordNotFound } from '@web3-storage/filecoin-api/errors'
import { S3Client } from '@aws-sdk/client-s3'
import { StoreOperationFailed } from '@web3-storage/filecoin-api/errors'
import * as Store from '../../upload-api/stores/agent/store.js'

/**
* Abstraction layer with Factory to perform operations on bucket storing
Expand All @@ -30,97 +26,27 @@ export function createReceiptStore(region, invocationBucketName, workflowBucketN
* @returns {import('@web3-storage/filecoin-api/storefront/api').ReceiptStore}
*/
export const useReceiptStore = (s3client, invocationBucketName, workflowBucketName) => {
const store = Store.open({
connection: { channel: s3client },
region: typeof s3client.config.region === 'string' ? s3client.config.region : 'us-west-2',
buckets: {
index: { name: invocationBucketName },
message: { name: workflowBucketName },
}
})

return {
put: async (record) => {
return {
error: new StoreOperationFailed('no new receipt should be put by storefront')
}
},
get: async (taskCid) => {
// TODO: When we distinct between TaskCid and InvocationCid, we also need to see this mapping.
const invocationCid = taskCid

// Find agent message archive CID where this receipt was stored
const encodedInvocationKeyPrefix = `${invocationCid.toString()}/`
const listCmd = new ListObjectsV2Command({
Bucket: invocationBucketName,
Prefix: encodedInvocationKeyPrefix
})
let listRes
try {
listRes = await s3client.send(listCmd)
} catch (/** @type {any} */ error) {
if (error?.$metadata?.httpStatusCode === 404) {
return {
error: new RecordNotFound(`any pseudo symlink from invocation ${invocationCid.toString()} was found`)
}
}
return {
error: new StoreOperationFailed(error.message)
}
}
if (!listRes.Contents?.length) {
return {
error: new RecordNotFound(`any pseudo symlink from invocation ${invocationCid.toString()} was found`)
}
}
// Key in format `${invocation.cid}/${agentMessageArchive.cid}.out`
const agentMessageArchiveWithReceipt = listRes.Contents.find(c => c.Key?.endsWith('.out'))
if (!agentMessageArchiveWithReceipt || !agentMessageArchiveWithReceipt.Key) {
return {
error: new RecordNotFound(`any pseudo symlink from invocation ${invocationCid.toString()} was found with a receipt`)
}
}

// Get Message Archive with receipt
const agentMessageArchiveWithReceiptCid = agentMessageArchiveWithReceipt.Key
.replace(encodedInvocationKeyPrefix, '')
.replace('.out', '')

const encodedAgentMessageArchiveKey = `${agentMessageArchiveWithReceiptCid}/${agentMessageArchiveWithReceiptCid}`
const getCmd = new GetObjectCommand({
Bucket: workflowBucketName,
Key: encodedAgentMessageArchiveKey,
})

let res
try {
res = await s3client.send(getCmd)
} catch (/** @type {any} */ error) {
if (error?.$metadata?.httpStatusCode === 404) {
return {
error: new RecordNotFound(`agent message archive ${encodedAgentMessageArchiveKey} not found in store`)
}
}
return {
error: new StoreOperationFailed(error.message)
}
}
if (!res || !res.Body) {
return {
error: new RecordNotFound(`agent message archive ${encodedAgentMessageArchiveKey} not found in store`)
}
}


// Get receipt from Message Archive
const agentMessageBytes = await res.Body.transformToByteArray()
const agentMessage = await CAR.request.decode({
body: agentMessageBytes,
headers: {},
})

// @ts-expect-error unknown link does not mach expectations
const receipt = agentMessage.receipts.get(invocationCid.toString())
if (!receipt) {
return {
error: new RecordNotFound(`agent message archive ${encodedAgentMessageArchiveKey} does not include receipt for invocation ${invocationCid.toString()}`)
}
}
return {
ok: receipt
}
},
/**
* @param {import('@ucanto/interface').UnknownLink} taskCid
*/
get: (taskCid) =>
// @ts-expect-error - need to align RecordNotFoundError
Store.getReceipt(store, taskCid),
has: async (record) => {
return {
error: new StoreOperationFailed('no receipt should checked by storefront')
Expand Down
108 changes: 15 additions & 93 deletions filecoin/store/task.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import {
S3Client,
GetObjectCommand,
ListObjectsV2Command
} from '@aws-sdk/client-s3'
import * as CAR from '@ucanto/transport/car'
import { StoreOperationFailed, RecordNotFound } from '@web3-storage/filecoin-api/errors'
import { S3Client } from '@aws-sdk/client-s3'
import { StoreOperationFailed } from '@web3-storage/filecoin-api/errors'
import * as Store from '../../upload-api/stores/agent/store.js'

/**
* Abstraction layer with Factory to perform operations on bucket storing
Expand All @@ -30,98 +26,24 @@ export function createTaskStore(region, invocationBucketName, workflowBucketName
* @returns {import('@web3-storage/filecoin-api/storefront/api').TaskStore}
*/
export const useTaskStore = (s3client, invocationBucketName, workflowBucketName) => {
const store = Store.open({
connection: { channel: s3client },
region: typeof s3client.config.region === 'string' ? s3client.config.region : 'us-west-2',
buckets: {
index: { name: invocationBucketName },
message: { name: workflowBucketName },
}
})

return {
put: async (record) => {
return {
error: new StoreOperationFailed('no new task should be put by storefront')
}
},
get: async (taskCid) => {
// TODO: When we distinct between TaskCid and InvocationCid, we also need to see this mapping.
const invocationCid = taskCid

// Find agent message archive CID where this receipt was stored
const encodedInvocationKeyPrefix = `${invocationCid.toString()}/`
const listCmd = new ListObjectsV2Command({
Bucket: invocationBucketName,
Prefix: encodedInvocationKeyPrefix
})
let listRes
try {
listRes = await s3client.send(listCmd)
} catch (/** @type {any} */ error) {
if (error?.$metadata?.httpStatusCode === 404) {
return {
error: new RecordNotFound(`any pseudo symlink from invocation ${invocationCid.toString()} was found`)
}
}
return {
error: new StoreOperationFailed(error.message)
}
}
if (!listRes.Contents?.length) {
return {
error: new RecordNotFound(`any pseudo symlink from invocation ${invocationCid.toString()} was found`)
}
}

// Key in format `${invocation.cid}/${agentMessageArchive.cid}.in`
const agentMessageArchiveWithInvocation = listRes.Contents.find(c => c.Key?.endsWith('.in'))
if (!agentMessageArchiveWithInvocation || !agentMessageArchiveWithInvocation.Key) {
return {
error: new RecordNotFound(`any pseudo symlink from invocation ${invocationCid.toString()} was found with a receipt`)
}
}

// Get Message Archive with invocation
const agentMessageArchiveWithInvocationCid = agentMessageArchiveWithInvocation.Key
.replace(encodedInvocationKeyPrefix, '')
.replace('.in', '')

const encodedAgentMessageArchiveKey = `${agentMessageArchiveWithInvocationCid}/${agentMessageArchiveWithInvocationCid}`
const getCmd = new GetObjectCommand({
Bucket: workflowBucketName,
Key: encodedAgentMessageArchiveKey,
})

let res
try {
res = await s3client.send(getCmd)
} catch (/** @type {any} */ error) {
if (error?.$metadata?.httpStatusCode === 404) {
return {
error: new RecordNotFound(`agent message archive ${encodedAgentMessageArchiveKey} not found in store`)
}
}
return {
error: new StoreOperationFailed(error.message)
}
}
if (!res || !res.Body) {
return {
error: new RecordNotFound(`agent message archive ${encodedAgentMessageArchiveKey} not found in store`)
}
}

// Get invocation from Message Archive
const agentMessageBytes = await res.Body.transformToByteArray()
const agentMessage = await CAR.request.decode({
body: agentMessageBytes,
headers: {},
})

const invocation = agentMessage.invocations.find(
(invocation) => invocation.cid.toString() === invocationCid.toString()
)
if (!invocation) {
return {
error: new RecordNotFound(`agent message archive ${encodedAgentMessageArchiveKey} does not include invocation ${invocationCid.toString()}`)
}
}
return {
ok: invocation
}
},
get: (taskCid) =>
// @ts-expect-error - need to align RecordNotFoundError
Store.getInvocation(store, taskCid),
has: async (record) => {
return {
error: new StoreOperationFailed('no task should checked by storefront')
Expand Down
Loading
Loading