Skip to content

Commit

Permalink
Merge pull request #1021 from permaweb/jfrain99/delete-old-tasks
Browse files Browse the repository at this point in the history
fix(mu): delete old tasks that leak into db
  • Loading branch information
jfrain99 authored Oct 2, 2024
2 parents 3c6fe76 + 0eaa1f1 commit 47ecd04
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion servers/mu/src/domain/clients/taskQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ import { TASKS_TABLE } from './sqlite.js'
*/
export async function createTaskQueue ({ queueId, logger, db }) {
logger({ log: `Initializing queue for queue index ${queueId}` })
function createDeleteQuery () {
return {
sql: `
DELETE FROM ${TASKS_TABLE}
WHERE queueId = ? AND timestamp < (strftime('%s', 'now') - 3600) * 1000;
`,
parameters: [
queueId
]
}
}

function createQuery () {
return {
sql: `
Expand All @@ -24,6 +36,8 @@ export async function createTaskQueue ({ queueId, logger, db }) {
}
}

await db.run(createDeleteQuery())
await db.run({ sql: 'VACUUM;', parameters: [] })
const queryResults = (await db.query(createQuery())).map((row) => ({ ...JSON.parse(row.data), dbId: row.id }))
const taskQueue = queryResults || []
return taskQueue
Expand Down Expand Up @@ -92,7 +106,7 @@ export function removeDequeuedTasksWith ({ dequeuedTasks, queueId, db }) {
return {
sql: `
DELETE FROM ${TASKS_TABLE}
WHERE id IN (${Array.from(dequeuedTasks).map(() => '?').join(',')})
WHERE id IN (${Array.from(dequeuedTasks).map(() => '?').join(',')}) OR timestamp < (strftime('%s', 'now') - 3600) * 1000;
`,
parameters: Array.from(dequeuedTasks)
}
Expand All @@ -104,6 +118,7 @@ export function removeDequeuedTasksWith ({ dequeuedTasks, queueId, db }) {
const query = createQuery(taskCopy)

db.run(query)
db.run({ sql: 'VACUUM;', parameters: [] })
taskCopy.forEach((task) => {
dequeuedTasks.delete(task)
})
Expand Down

0 comments on commit 47ecd04

Please sign in to comment.