Skip to content

Commit

Permalink
chore(mu): optimize delete old traces
Browse files Browse the repository at this point in the history
  • Loading branch information
jfrain99 committed Aug 24, 2024
1 parent 8047a6d commit 1e157b5
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 14 deletions.
7 changes: 7 additions & 0 deletions servers/mu/src/domain/clients/sqlite.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ const createTracesIndexes = async (db) => db.prepare(
(messageId, processId);`
).run()

const createTracesTimestampIndexes = async (db) => db.prepare(
`CREATE INDEX IF NOT EXISTS idx_${TRACES_TABLE}_timestamp
ON ${TRACES_TABLE}
(timestamp);`
).run()

let internalSqliteDb
export async function createSqliteClient ({ url, bootstrap = false, walLimit = bytes.parse('100mb'), type = 'tasks' }) {
if (internalSqliteDb) return internalSqliteDb
Expand All @@ -71,6 +77,7 @@ export async function createSqliteClient ({ url, bootstrap = false, walLimit = b
await Promise.resolve()
.then(() => createTraces(db))
.then(() => createTracesIndexes(db))
.then(() => createTracesTimestampIndexes(db))
}
}

Expand Down
17 changes: 17 additions & 0 deletions servers/mu/src/domain/clients/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { cond, equals, propOr, tap } from 'ramda'
import cron from 'node-cron'

import { createTaskQueue, enqueueWith, dequeueWith, removeDequeuedTasksWith } from './taskQueue.js'
import { deleteOldTracesWith } from './tracer.js'
import { domainConfigSchema, config } from '../../config.js'
// Without this import the worker crashes
import { createResultApis } from '../../domain/index.js'
Expand Down Expand Up @@ -293,6 +294,22 @@ ct = cron.schedule('*/2 * * * * *', async () => {
}
})

let traceCt = null
let traceIsJobRunning = false
const traceDb = await createSqliteClient({ url: workerData.TRACE_DB_URL, bootstrap: false, type: 'traces' })
const deleteOldTraces = deleteOldTracesWith({ db: traceDb })
/**
* Create cron to clear out traces, each hour
*/
traceCt = cron.schedule('0 * * * *', async () => {
if (!traceIsJobRunning) {
traceIsJobRunning = true
traceCt.stop()
await deleteOldTraces()
traceCt.start()
traceIsJobRunning = false
}
})
/**
* Start the processing of results from
* the queue and expose the worker api
Expand Down
17 changes: 3 additions & 14 deletions servers/mu/src/domain/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { randomBytes } from 'node:crypto'
import { BroadcastChannel } from 'node:worker_threads'
import cron from 'node-cron'

import { apply } from 'ramda'
import warpArBundles from 'warp-arbundles'
import { connect as schedulerUtilsConnect } from '@permaweb/ao-scheduler-utils'
Expand All @@ -16,7 +14,7 @@ import gatewayClient from './clients/gateway.js'
import * as InMemoryClient from './clients/in-memory.js'
import * as MetricsClient from './clients/metrics.js'
import * as SqliteClient from './clients/sqlite.js'
import cronClient, { deleteCronProcessWith, deleteOldTracesWith, getCronProcessCursorWith, saveCronProcessWith, updateCronProcessCursorWith } from './clients/cron.js'
import cronClient, { deleteCronProcessWith, getCronProcessCursorWith, saveCronProcessWith, updateCronProcessCursorWith } from './clients/cron.js'
import { readTracesWith } from './clients/tracer.js'

import { processMsgWith } from './api/processMsg.js'
Expand Down Expand Up @@ -151,6 +149,7 @@ export const createApis = async (ctx) => {
id: workerId,
queueId,
DB_URL,
TRACE_DB_URL,
TASK_QUEUE_MAX_RETRIES: ctx.TASK_QUEUE_MAX_RETRIES,
TASK_QUEUE_RETRY_DELAY: ctx.TASK_QUEUE_RETRY_DELAY
}
Expand Down Expand Up @@ -208,14 +207,6 @@ export const createApis = async (ctx) => {
const deleteCronProcess = deleteCronProcessWith({ db })
const updateCronProcessCursor = updateCronProcessCursorWith({ db })
const getCronProcessCursor = getCronProcessCursorWith({ db })
const deleteOldTraces = deleteOldTracesWith({ db: traceDb })

/**
* Create cron to clear out traces, each hour
*/
cron.schedule('* * * * *', async () => {
await deleteOldTraces()
})

async function getCronProcesses () {
function createQuery () {
Expand Down Expand Up @@ -281,9 +272,7 @@ export const createApis = async (ctx) => {
traceMsgs,
initCronProcs: cronClient.initCronProcsWith({
startMonitoredProcess: startProcessMonitor,
getCronProcesses,
deleteOldTraces,
cron
getCronProcesses
})
}
}
Expand Down

0 comments on commit 1e157b5

Please sign in to comment.