Skip to content

Commit

Permalink
feat(mu): add message recovery for non-pushed results
Browse files Browse the repository at this point in the history
  • Loading branch information
jfrain99 committed Jan 6, 2025
1 parent 859af1e commit b22737b
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 45 deletions.
4 changes: 4 additions & 0 deletions servers/mu/src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ export const server = pipe(
logger({ log: 'Crons initialized' })
})

domain.apis.startMessageRecoveryCron().then(() => {
logger({ log: 'Message recovery cron started' })
})

return server
}
)(express())
18 changes: 15 additions & 3 deletions servers/mu/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ export const domainConfigSchema = z.object({
TASK_QUEUE_RETRY_DELAY: positiveIntSchema,
DISABLE_TRACE: z.boolean(),
SPAWN_PUSH_ENABLED: z.boolean(),
ALLOW_PUSHES_AFTER: z.number()
ALLOW_PUSHES_AFTER: positiveIntSchema,
GET_RESULT_MAX_RETRIES: positiveIntSchema,
GET_RESULT_RETRY_DELAY: positiveIntSchema,
MESSAGE_RECOVERY_MAX_RETRIES: positiveIntSchema,
MESSAGE_RECOVERY_RETRY_DELAY: positiveIntSchema
})

/**
Expand Down Expand Up @@ -104,7 +108,11 @@ const CONFIG_ENVS = {
TASK_QUEUE_RETRY_DELAY: process.env.TASK_QUEUE_RETRY_DELAY || 1000,
DISABLE_TRACE: process.env.DISABLE_TRACE !== 'false',
SPAWN_PUSH_ENABLED: process.env.SPAWN_PUSH_ENABLED === 'true',
ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1572103
ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1572103,
GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5,
GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000,
MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 5,
MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000
},
production: {
MODE,
Expand All @@ -127,7 +135,11 @@ const CONFIG_ENVS = {
TASK_QUEUE_RETRY_DELAY: process.env.TASK_QUEUE_RETRY_DELAY || 1000,
DISABLE_TRACE: process.env.DISABLE_TRACE !== 'false',
SPAWN_PUSH_ENABLED: process.env.SPAWN_PUSH_ENABLED === 'true',
ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1572103
ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1572103,
GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5,
GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000,
MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 5,
MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000
}
}

Expand Down
235 changes: 199 additions & 36 deletions servers/mu/src/domain/api/sendDataItem.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { of, Rejected, fromPromise, Resolved } from 'hyper-async'
import { identity } from 'ramda'
import { compose, head, identity, isEmpty, prop, propOr } from 'ramda'

import { getCuAddressWith } from '../lib/get-cu-address.js'
import { writeMessageTxWith } from '../lib/write-message-tx.js'
Expand All @@ -8,6 +8,7 @@ import { parseDataItemWith } from '../lib/parse-data-item.js'
import { verifyParsedDataItemWith } from '../lib/verify-parsed-data-item.js'
import { writeProcessTxWith } from '../lib/write-process-tx.js'
import { locateProcessSchema } from '../dal.js'
import { MESSAGES_TABLE } from '../clients/sqlite.js'

/**
* Forward along the DataItem to the SU,
Expand All @@ -26,14 +27,18 @@ export function sendDataItemWith ({
logger,
fetchSchedulerProcess,
writeDataItemArweave,
spawnPushEnabled
spawnPushEnabled,
db,
GET_RESULT_MAX_RETRIES,
GET_RESULT_RETRY_DELAY
}) {
const verifyParsedDataItem = verifyParsedDataItemWith()
const parseDataItem = parseDataItemWith({ createDataItem, logger })
const getCuAddress = getCuAddressWith({ selectNode, logger })
const writeMessage = writeMessageTxWith({ locateProcess, writeDataItem, logger, fetchSchedulerProcess, writeDataItemArweave })
const pullResult = pullResultWith({ fetchResult, logger })
const writeProcess = writeProcessTxWith({ locateScheduler, writeDataItem, logger })
const getResult = getResultWith({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY })

const locateProcessLocal = fromPromise(locateProcessSchema.implement(locateProcess))

Expand All @@ -58,37 +63,68 @@ export function sendDataItemWith ({
},
(res) => Resolved(res)
)
.map(res => ({
...res,
/**
* An opaque method to fetch the result of the message just forwarded
* and then crank its results
*/
crank: () => of({ ...res, initialTxId: res.tx.id })
.chain(getCuAddress)
.chain(pullResult)
.chain((ctx) => {
const { msgs, spawns, assigns, initialTxId, messageId: parentId } = ctx
return crank({
msgs,
spawns,
assigns,
initialTxId,
parentId
})
})
.bimap(
(res) => {
logger({ log: 'Failed to push messages', end: true }, ctx)
return res
},
(res) => {
logger({ log: 'Pushing complete', end: true }, ctx)
return res
}
)
}))
)
.map(res => {
return {
...res,
/**
* An opaque method to fetch the result of the message just forwarded
* and then crank its results
*/
crank: () => {
return of({ ...res, initialTxId: res.tx.id, pullResultAttempts: 0 })
.chain(fromPromise(insertMessage))
.chain(fromPromise(getResult))
.chain(fromPromise(deleteMessage))
.chain((ctx) => {
const { msgs, spawns, assigns, initialTxId, messageId: parentId } = ctx
return crank({
msgs,
spawns,
assigns,
initialTxId,
parentId
})
})
.bimap(
(res) => {
logger({ log: 'Failed to push messages', end: true }, ctx)
return res
},
(res) => {
logger({ log: 'Pushing complete', end: true }, ctx)
return res
}
)
}
}
}
))

async function insertMessage (ctx) {
const query = {
sql: `
INSERT OR IGNORE INTO ${MESSAGES_TABLE} (
id,
timestamp,
data,
retries
) VALUES (?, ?, ?, 0)
`,
parameters: [ctx.logId, new Date().getTime(), JSON.stringify(ctx)]
}
return await db.run(query).then(() => ctx)
}

async function deleteMessage (ctx) {
const query = {
sql: `
DELETE FROM ${MESSAGES_TABLE}
WHERE id = ?
`,
parameters: [ctx.logId]
}
return await db.run(query).then(() => ctx)
}

/**
* If the Data Item is a Process, we push an Assignment
Expand Down Expand Up @@ -180,9 +216,6 @@ export function sendDataItemWith ({
return (ctx) => {
return of(ctx)
.chain(parseDataItem)
.map((ctx) => {
return ctx
})
.chain((ctx) =>
verifyParsedDataItem(ctx.dataItem)
.map(logger.tap({ log: 'Successfully verified parsed data item', logId: ctx.logId }))
Expand All @@ -207,3 +240,133 @@ export function sendDataItemWith ({
)
}
}

function getResultWith ({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY }) {
const getCuAddress = getCuAddressWith({ selectNode, logger })
const pullResult = pullResultWith({ fetchResult, logger })

return async function getResult (ctx) {
const attempts = ctx.pullResultAttempts
return of(ctx)
.chain(getCuAddress)
.chain(pullResult)
.bichain(
fromPromise(async (_err) => {
if (attempts < GET_RESULT_MAX_RETRIES) {
ctx.pullResultAttempts++
await new Promise(resolve => setTimeout(resolve, GET_RESULT_RETRY_DELAY * (2 ** attempts)))
return await getResult(ctx)
}
throw new Error(`GetResult ran out of retries (${GET_RESULT_MAX_RETRIES}). Bubbling error...`)
}),
Resolved
)
.toPromise()
}
}

function selectMessageWith ({ db }) {
return async () => {
const timestamp = new Date().getTime()
const query = {
sql: `SELECT * FROM ${MESSAGES_TABLE} WHERE timestamp < ? ORDER BY timestamp ASC LIMIT 1`,
parameters: [timestamp]
}
return await db.query(query)
}
}

function deleteMessageWith ({ db }) {
return async (logId) => {
const query = {
sql: `
DELETE FROM ${MESSAGES_TABLE}
WHERE id = ?
`,
parameters: [logId]
}

return await db.run(query).then(() => logId)
}
}

function updateMessageTimestampWith ({ db, logger, MESSAGE_RECOVERY_MAX_RETRIES, MESSAGE_RECOVERY_RETRY_DELAY }) {
return async (logId, retries) => {
const deleteQuery = {
sql: `DELETE FROM ${MESSAGES_TABLE} WHERE id = ?`,
parameters: [logId]
}
const updateOffset = MESSAGE_RECOVERY_RETRY_DELAY * (2 ** retries)
const updateQuery = {
sql: `UPDATE OR IGNORE ${MESSAGES_TABLE} SET timestamp = ?, retries = retries + 1 WHERE id = ?`,
parameters: [new Date().getTime() + updateOffset, logId]
}
let query = updateQuery
if (retries > MESSAGE_RECOVERY_MAX_RETRIES) {
query = deleteQuery
logger({ log: `Message with logId ${logId} has ran out of retries and been deleted`, end: true }, { logId })
}

return await db.run(query).catch((e) => {
console.log('error', e)
return e
}).then(() => logId)
}
}

export function startMessageRecoveryCronWith ({ selectNode, fetchResult, logger, db, cron, crank, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY, MESSAGE_RECOVERY_MAX_RETRIES, MESSAGE_RECOVERY_RETRY_DELAY }) {
const getResult = getResultWith({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY })
const selectMessage = selectMessageWith({ db })
const deleteMessage = deleteMessageWith({ db })
const updateMessageTimestamp = updateMessageTimestampWith({ db, logger, MESSAGE_RECOVERY_MAX_RETRIES, MESSAGE_RECOVERY_RETRY_DELAY })
return async () => {
let ct = null
let isJobRunning = false
ct = cron.schedule('*/10 * * * * *', async () => {
if (!isJobRunning) {
isJobRunning = true
ct.stop() // pause cron while recovering messages
await selectMessage()
.then((res) => ({ ctx: compose(JSON.parse, propOr('{}', 'data'), head)(res), retries: compose(prop('retries'), head)(res) }))
.then(({ ctx, retries }) => {
if (isEmpty(ctx)) {
isJobRunning = false
return
}
const logId = ctx.logId
logger({ log: `Attempting to recover message, retry ${retries} of ${MESSAGE_RECOVERY_MAX_RETRIES}` }, { logId })
return getResult(ctx)
.then((res) => {
const { msgs, spawns, assigns, initialTxId, messageId: parentId } = res
return crank({
msgs,
spawns,
assigns,
initialTxId,
parentId
})
})
.then(() => {
logger({ log: 'Successfully pushed message results', end: true }, { logId })
return deleteMessage(logId)
})
.then(() => {
isJobRunning = false
})
.catch((e) => {
const delay = MESSAGE_RECOVERY_RETRY_DELAY * (2 ** retries)
logger({ log: `Error recovering message - getResult, retrying in ${delay}ms: ${e}` }, ctx)
updateMessageTimestamp(logId, retries)
isJobRunning = false
})
})
.catch((e) => {
logger({ log: `Error recovering message - selectMessage: ${e}` })
isJobRunning = false
})

ct.start() // resume cron when done recovering messages
}
})
}
}
2 changes: 1 addition & 1 deletion servers/mu/src/domain/clients/cu.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function resultWith ({ fetch, histogram, CU_URL, logger }) {
).then(okRes),
{
maxRetries: 5,
delay: 500,
delay: 1000,
log: logger,
logId,
name: `fetchResult(${JSON.stringify({
Expand Down
16 changes: 14 additions & 2 deletions servers/mu/src/domain/clients/sqlite.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,22 @@ import { stat } from 'node:fs'
import Database from 'better-sqlite3'
import bytes from 'bytes'

export const [TASKS_TABLE, TRACES_TABLE, CRON_PROCESSES_TABLE] = [
export const [TASKS_TABLE, TRACES_TABLE, CRON_PROCESSES_TABLE, MESSAGES_TABLE] = [
'tasks',
'traces',
'cron_processes'
'cron_processes',
'messages'
]

const createMessages = async (db) => db.prepare(
`CREATE TABLE IF NOT EXISTS ${MESSAGES_TABLE}(
id TEXT PRIMARY KEY,
timestamp INTEGER,
data TEXT,
retries INTEGER
) WITHOUT ROWID;`
).run()

const createTasks = async (db) => db.prepare(
`CREATE TABLE IF NOT EXISTS ${TASKS_TABLE}(
id TEXT PRIMARY KEY,
Expand Down Expand Up @@ -72,6 +83,7 @@ export async function createSqliteClient ({ url, bootstrap = false, walLimit = b
await Promise.resolve()
.then(() => createTasks(db))
.then(() => createCronProcesses(db))
.then(() => createMessages(db))
}
if (type === 'traces') {
await Promise.resolve()
Expand Down
Loading

0 comments on commit b22737b

Please sign in to comment.