diff --git a/servers/mu/src/app.js b/servers/mu/src/app.js index 68a998707..9abbe9774 100644 --- a/servers/mu/src/app.js +++ b/servers/mu/src/app.js @@ -26,6 +26,10 @@ export const server = pipe( logger({ log: 'Crons initialized' }) }) + domain.apis.startMessageRecoveryCron().then(() => { + logger({ log: 'Message recovery cron started' }) + }) + return server } )(express()) diff --git a/servers/mu/src/config.js b/servers/mu/src/config.js index c9ead1431..982922206 100644 --- a/servers/mu/src/config.js +++ b/servers/mu/src/config.js @@ -58,7 +58,11 @@ export const domainConfigSchema = z.object({ TASK_QUEUE_RETRY_DELAY: positiveIntSchema, DISABLE_TRACE: z.boolean(), SPAWN_PUSH_ENABLED: z.boolean(), - ALLOW_PUSHES_AFTER: z.number() + ALLOW_PUSHES_AFTER: positiveIntSchema, + GET_RESULT_MAX_RETRIES: positiveIntSchema, + GET_RESULT_RETRY_DELAY: positiveIntSchema, + MESSAGE_RECOVERY_MAX_RETRIES: positiveIntSchema, + MESSAGE_RECOVERY_RETRY_DELAY: positiveIntSchema }) /** @@ -104,7 +108,11 @@ const CONFIG_ENVS = { TASK_QUEUE_RETRY_DELAY: process.env.TASK_QUEUE_RETRY_DELAY || 1000, DISABLE_TRACE: process.env.DISABLE_TRACE !== 'false', SPAWN_PUSH_ENABLED: process.env.SPAWN_PUSH_ENABLED === 'true', - ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1572103 + ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1572103, + GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5, + GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000, + MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 5, + MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000 }, production: { MODE, @@ -127,7 +135,11 @@ const CONFIG_ENVS = { TASK_QUEUE_RETRY_DELAY: process.env.TASK_QUEUE_RETRY_DELAY || 1000, DISABLE_TRACE: process.env.DISABLE_TRACE !== 'false', SPAWN_PUSH_ENABLED: process.env.SPAWN_PUSH_ENABLED === 'true', - ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1572103 + ALLOW_PUSHES_AFTER: process.env.ALLOW_PUSHES_AFTER || 1572103, + GET_RESULT_MAX_RETRIES: process.env.GET_RESULT_MAX_RETRIES || 5, + GET_RESULT_RETRY_DELAY: process.env.GET_RESULT_RETRY_DELAY || 1000, + MESSAGE_RECOVERY_MAX_RETRIES: process.env.MESSAGE_RECOVERY_MAX_RETRIES || 5, + MESSAGE_RECOVERY_RETRY_DELAY: process.env.MESSAGE_RECOVERY_RETRY_DELAY || 1000 } } diff --git a/servers/mu/src/domain/api/sendDataItem.js b/servers/mu/src/domain/api/sendDataItem.js index dde561812..5054f9c27 100644 --- a/servers/mu/src/domain/api/sendDataItem.js +++ b/servers/mu/src/domain/api/sendDataItem.js @@ -1,5 +1,5 @@ import { of, Rejected, fromPromise, Resolved } from 'hyper-async' -import { identity } from 'ramda' +import { compose, head, identity, isEmpty, prop, propOr } from 'ramda' import { getCuAddressWith } from '../lib/get-cu-address.js' import { writeMessageTxWith } from '../lib/write-message-tx.js' @@ -8,6 +8,7 @@ import { parseDataItemWith } from '../lib/parse-data-item.js' import { verifyParsedDataItemWith } from '../lib/verify-parsed-data-item.js' import { writeProcessTxWith } from '../lib/write-process-tx.js' import { locateProcessSchema } from '../dal.js' +import { MESSAGES_TABLE } from '../clients/sqlite.js' /** * Forward along the DataItem to the SU, @@ -26,7 +27,10 @@ export function sendDataItemWith ({ logger, fetchSchedulerProcess, writeDataItemArweave, - spawnPushEnabled + spawnPushEnabled, + db, + GET_RESULT_MAX_RETRIES, + GET_RESULT_RETRY_DELAY }) { const verifyParsedDataItem = verifyParsedDataItemWith() const parseDataItem = parseDataItemWith({ createDataItem, logger }) @@ -34,6 +38,7 @@ export function sendDataItemWith ({ const writeMessage = writeMessageTxWith({ locateProcess, writeDataItem, logger, fetchSchedulerProcess, writeDataItemArweave }) const pullResult = pullResultWith({ fetchResult, logger }) const writeProcess = writeProcessTxWith({ locateScheduler, writeDataItem, logger }) + const getResult = getResultWith({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY }) const locateProcessLocal = fromPromise(locateProcessSchema.implement(locateProcess)) @@ -58,37 +63,68 @@ export function sendDataItemWith ({ }, (res) => Resolved(res) ) - .map(res => ({ - ...res, - /** - * An opaque method to fetch the result of the message just forwarded - * and then crank its results - */ - crank: () => of({ ...res, initialTxId: res.tx.id }) - .chain(getCuAddress) - .chain(pullResult) - .chain((ctx) => { - const { msgs, spawns, assigns, initialTxId, messageId: parentId } = ctx - return crank({ - msgs, - spawns, - assigns, - initialTxId, - parentId - }) - }) - .bimap( - (res) => { - logger({ log: 'Failed to push messages', end: true }, ctx) - return res - }, - (res) => { - logger({ log: 'Pushing complete', end: true }, ctx) - return res - } - ) - })) - ) + .map(res => { + return { + ...res, + /** + * An opaque method to fetch the result of the message just forwarded + * and then crank its results + */ + crank: () => { + return of({ ...res, initialTxId: res.tx.id, pullResultAttempts: 0 }) + .chain(fromPromise(insertMessage)) + .chain(fromPromise(getResult)) + .chain(fromPromise(deleteMessage)) + .chain((ctx) => { + const { msgs, spawns, assigns, initialTxId, messageId: parentId } = ctx + return crank({ + msgs, + spawns, + assigns, + initialTxId, + parentId + }) + }) + .bimap( + (res) => { + logger({ log: 'Failed to push messages', end: true }, ctx) + return res + }, + (res) => { + logger({ log: 'Pushing complete', end: true }, ctx) + return res + } + ) + } + } + } + )) + + async function insertMessage (ctx) { + const query = { + sql: ` + INSERT OR IGNORE INTO ${MESSAGES_TABLE} ( + id, + timestamp, + data, + retries + ) VALUES (?, ?, ?, 0) + `, + parameters: [ctx.logId, new Date().getTime(), JSON.stringify(ctx)] + } + return await db.run(query).then(() => ctx) + } + + async function deleteMessage (ctx) { + const query = { + sql: ` + DELETE FROM ${MESSAGES_TABLE} + WHERE id = ? + `, + parameters: [ctx.logId] + } + return await db.run(query).then(() => ctx) + } /** * If the Data Item is a Process, we push an Assignment @@ -180,9 +216,6 @@ export function sendDataItemWith ({ return (ctx) => { return of(ctx) .chain(parseDataItem) - .map((ctx) => { - return ctx - }) .chain((ctx) => verifyParsedDataItem(ctx.dataItem) .map(logger.tap({ log: 'Successfully verified parsed data item', logId: ctx.logId })) @@ -207,3 +240,133 @@ export function sendDataItemWith ({ ) } } + +function getResultWith ({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY }) { + const getCuAddress = getCuAddressWith({ selectNode, logger }) + const pullResult = pullResultWith({ fetchResult, logger }) + + return async function getResult (ctx) { + const attempts = ctx.pullResultAttempts + return of(ctx) + .chain(getCuAddress) + .chain(pullResult) + .bichain( + fromPromise(async (_err) => { + if (attempts < GET_RESULT_MAX_RETRIES) { + ctx.pullResultAttempts++ + await new Promise(resolve => setTimeout(resolve, GET_RESULT_RETRY_DELAY * (2 ** attempts))) + return await getResult(ctx) + } + throw new Error(`GetResult ran out of retries (${GET_RESULT_MAX_RETRIES}). Bubbling error...`) + }), + Resolved + ) + .toPromise() + } +} + +function selectMessageWith ({ db }) { + return async () => { + const timestamp = new Date().getTime() + const query = { + sql: `SELECT * FROM ${MESSAGES_TABLE} WHERE timestamp < ? ORDER BY timestamp ASC LIMIT 1`, + parameters: [timestamp] + } + return await db.query(query) + } +} + +function deleteMessageWith ({ db }) { + return async (logId) => { + const query = { + sql: ` + DELETE FROM ${MESSAGES_TABLE} + WHERE id = ? + `, + parameters: [logId] + } + + return await db.run(query).then(() => logId) + } +} + +function updateMessageTimestampWith ({ db, logger, MESSAGE_RECOVERY_MAX_RETRIES, MESSAGE_RECOVERY_RETRY_DELAY }) { + return async (logId, retries) => { + const deleteQuery = { + sql: `DELETE FROM ${MESSAGES_TABLE} WHERE id = ?`, + parameters: [logId] + } + const updateOffset = MESSAGE_RECOVERY_RETRY_DELAY * (2 ** retries) + const updateQuery = { + sql: `UPDATE OR IGNORE ${MESSAGES_TABLE} SET timestamp = ?, retries = retries + 1 WHERE id = ?`, + parameters: [new Date().getTime() + updateOffset, logId] + } + let query = updateQuery + if (retries > MESSAGE_RECOVERY_MAX_RETRIES) { + query = deleteQuery + logger({ log: `Message with logId ${logId} has ran out of retries and been deleted`, end: true }, { logId }) + } + + return await db.run(query).catch((e) => { + console.log('error', e) + return e + }).then(() => logId) + } +} + +export function startMessageRecoveryCronWith ({ selectNode, fetchResult, logger, db, cron, crank, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY, MESSAGE_RECOVERY_MAX_RETRIES, MESSAGE_RECOVERY_RETRY_DELAY }) { + const getResult = getResultWith({ selectNode, fetchResult, logger, GET_RESULT_MAX_RETRIES, GET_RESULT_RETRY_DELAY }) + const selectMessage = selectMessageWith({ db }) + const deleteMessage = deleteMessageWith({ db }) + const updateMessageTimestamp = updateMessageTimestampWith({ db, logger, MESSAGE_RECOVERY_MAX_RETRIES, MESSAGE_RECOVERY_RETRY_DELAY }) + return async () => { + let ct = null + let isJobRunning = false + ct = cron.schedule('*/10 * * * * *', async () => { + if (!isJobRunning) { + isJobRunning = true + ct.stop() // pause cron while recovering messages + await selectMessage() + .then((res) => ({ ctx: compose(JSON.parse, propOr('{}', 'data'), head)(res), retries: compose(prop('retries'), head)(res) })) + .then(({ ctx, retries }) => { + if (isEmpty(ctx)) { + isJobRunning = false + return + } + const logId = ctx.logId + logger({ log: `Attempting to recover message, retry ${retries} of ${MESSAGE_RECOVERY_MAX_RETRIES}` }, { logId }) + return getResult(ctx) + .then((res) => { + const { msgs, spawns, assigns, initialTxId, messageId: parentId } = res + return crank({ + msgs, + spawns, + assigns, + initialTxId, + parentId + }) + }) + .then(() => { + logger({ log: 'Successfully pushed message results', end: true }, { logId }) + return deleteMessage(logId) + }) + .then(() => { + isJobRunning = false + }) + .catch((e) => { + const delay = MESSAGE_RECOVERY_RETRY_DELAY * (2 ** retries) + logger({ log: `Error recovering message - getResult, retrying in ${delay}ms: ${e}` }, ctx) + updateMessageTimestamp(logId, retries) + isJobRunning = false + }) + }) + .catch((e) => { + logger({ log: `Error recovering message - selectMessage: ${e}` }) + isJobRunning = false + }) + + ct.start() // resume cron when done recovering messages + } + }) + } +} diff --git a/servers/mu/src/domain/clients/cu.js b/servers/mu/src/domain/clients/cu.js index 9ee86eef2..e9e6a2d57 100644 --- a/servers/mu/src/domain/clients/cu.js +++ b/servers/mu/src/domain/clients/cu.js @@ -42,7 +42,7 @@ function resultWith ({ fetch, histogram, CU_URL, logger }) { ).then(okRes), { maxRetries: 5, - delay: 500, + delay: 1000, log: logger, logId, name: `fetchResult(${JSON.stringify({ diff --git a/servers/mu/src/domain/clients/sqlite.js b/servers/mu/src/domain/clients/sqlite.js index f6ee604c0..c0a0dff40 100644 --- a/servers/mu/src/domain/clients/sqlite.js +++ b/servers/mu/src/domain/clients/sqlite.js @@ -3,11 +3,22 @@ import { stat } from 'node:fs' import Database from 'better-sqlite3' import bytes from 'bytes' -export const [TASKS_TABLE, TRACES_TABLE, CRON_PROCESSES_TABLE] = [ +export const [TASKS_TABLE, TRACES_TABLE, CRON_PROCESSES_TABLE, MESSAGES_TABLE] = [ 'tasks', 'traces', - 'cron_processes' + 'cron_processes', + 'messages' ] + +const createMessages = async (db) => db.prepare( + `CREATE TABLE IF NOT EXISTS ${MESSAGES_TABLE}( + id TEXT PRIMARY KEY, + timestamp INTEGER, + data TEXT, + retries INTEGER + ) WITHOUT ROWID;` +).run() + const createTasks = async (db) => db.prepare( `CREATE TABLE IF NOT EXISTS ${TASKS_TABLE}( id TEXT PRIMARY KEY, @@ -72,6 +83,7 @@ export async function createSqliteClient ({ url, bootstrap = false, walLimit = b await Promise.resolve() .then(() => createTasks(db)) .then(() => createCronProcesses(db)) + .then(() => createMessages(db)) } if (type === 'traces') { await Promise.resolve() diff --git a/servers/mu/src/domain/index.js b/servers/mu/src/domain/index.js index 260c33323..0aa2c1854 100644 --- a/servers/mu/src/domain/index.js +++ b/servers/mu/src/domain/index.js @@ -23,7 +23,7 @@ import { processMsgWith } from './api/processMsg.js' import { processSpawnWith } from './api/processSpawn.js' import { monitorProcessWith } from './api/monitorProcess.js' import { stopMonitorProcessWith } from './api/stopMonitorProcess.js' -import { sendDataItemWith } from './api/sendDataItem.js' +import { sendDataItemWith, startMessageRecoveryCronWith } from './api/sendDataItem.js' import { sendAssignWith } from './api/sendAssign.js' import { processAssignWith } from './api/processAssign.js' import { pushMsgWith } from './api/pushMsg.js' @@ -214,7 +214,10 @@ export const createApis = async (ctx) => { isWallet: gatewayClient.isWalletWith({ fetch, histogram, ARWEAVE_URL, logger: sendDataItemLogger }), logger: sendDataItemLogger, writeDataItemArweave: uploaderClient.uploadDataItemWith({ UPLOADER_URL, logger: sendDataItemLogger, fetch, histogram }), - spawnPushEnabled: SPAWN_PUSH_ENABLED + spawnPushEnabled: SPAWN_PUSH_ENABLED, + db, + GET_RESULT_MAX_RETRIES: ctx.GET_RESULT_MAX_RETRIES, + GET_RESULT_RETRY_DELAY: ctx.GET_RESULT_RETRY_DELAY }) const sendAssignLogger = logger.child('sendAssign') @@ -305,6 +308,20 @@ export const createApis = async (ctx) => { ALLOW_PUSHES_AFTER }) + const startMessageRecoveryCronLogger = logger.child('messageRecoveryCron') + const startMessageRecoveryCron = startMessageRecoveryCronWith({ + selectNode: cuClient.selectNodeWith({ CU_URL, logger: startMessageRecoveryCronLogger }), + fetchResult: cuClient.resultWith({ fetch: fetchWithCache, histogram, CU_URL, logger: startMessageRecoveryCronLogger }), + logger: startMessageRecoveryCronLogger, + db, + cron, + crank, + GET_RESULT_MAX_RETRIES: ctx.GET_RESULT_MAX_RETRIES, + GET_RESULT_RETRY_DELAY: ctx.GET_RESULT_RETRY_DELAY, + MESSAGE_RECOVERY_MAX_RETRIES: ctx.MESSAGE_RECOVERY_MAX_RETRIES, + MESSAGE_RECOVERY_RETRY_DELAY: ctx.MESSAGE_RECOVERY_RETRY_DELAY + }) + return { metrics, sendDataItem, @@ -317,7 +334,8 @@ export const createApis = async (ctx) => { initCronProcs: cronClient.initCronProcsWith({ startMonitoredProcess: startProcessMonitor, getCronProcesses - }) + }), + startMessageRecoveryCron } }