Skip to content

Commit

Permalink
chore(mu): add tests for cron, worker #747
Browse files Browse the repository at this point in the history
  • Loading branch information
jfrain99 committed Aug 26, 2024
1 parent c2b1802 commit 531d5e9
Show file tree
Hide file tree
Showing 7 changed files with 691 additions and 255 deletions.
64 changes: 17 additions & 47 deletions servers/mu/src/domain/clients/cron.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import cron from 'node-cron'
import { withTimerMetricsFetch } from '../lib/with-timer-metrics-fetch.js'
import { CRON_PROCESSES_TABLE, TRACES_TABLE } from './sqlite.js'
import { CRON_PROCESSES_TABLE } from './sqlite.js'
/**
* cronsRunning stores the node cron response
* which can be used to stop a cron that is running
Expand Down Expand Up @@ -52,6 +51,11 @@ export function deleteCronProcessWith ({ db }) {
}
}

/**
* Given a processId, retrieve its cursor
* @param {{ processId: string }} processId - the processId to retrieve the cursor of
* @returns the cron process's cursor
*/
export function getCronProcessCursorWith ({ db }) {
return async ({ processId }) => {
function createQuery ({ processId }) {
Expand All @@ -69,6 +73,11 @@ export function getCronProcessCursorWith ({ db }) {
}
}

/**
* Updates a cron process's cursor in the database
* @param {{ processId: string }} processId - the processId to update the cursor of
* @param {{ cursor: string }} cursor - the new cursor of the cron process
*/
export function updateCronProcessCursorWith ({ db }) {
return async ({ processId, cursor }) => {
function createQuery ({ processId, cursor }) {
Expand All @@ -88,49 +97,11 @@ export function updateCronProcessCursorWith ({ db }) {
}
}

export function deleteOldTracesWith ({ db, logger }) {
return async () => {
function createQuery ({ overflow }) {
/**
* Check if the database has grown to greater than 1GB.
* If it has, delete the least recent 10% of traces.
*/
return {
sql: `
WITH delete_entries AS (
SELECT COUNT(*) AS total_rows, CEILING(COUNT(*) * ?) AS rows_to_delete
FROM ${TRACES_TABLE}
)
DELETE FROM ${TRACES_TABLE}
WHERE timestamp IN (
SELECT timestamp
FROM ${TRACES_TABLE}
ORDER BY timestamp ASC
LIMIT (SELECT rows_to_delete FROM delete_entries)
)
`,
parameters: [overflow]
}
}
const pageSize = await db.pragma('page_size', { simple: true })
const pageCount = await db.pragma('page_count', { simple: true })
/**
* Calculate if we are over the maximum amount of bytes allocated
* to the trace database. If we are, we will remove the oldest traces
* such that we will return to below the maximum amount.
*/
const totalBytes = pageSize * pageCount
const maxBytes = 1024 * 1024 * 1024
const overflow = maxBytes / totalBytes
if (overflow >= 1) return
logger(({ log: `Deleting old traces, overflow of ${1 - overflow}` }))
await db.run(createQuery({ overflow: 1 - overflow }))
await db.run({ sql: 'VACUUM;', parameters: [] })
logger({ log: 'Deleted old traces.' })
}
}

function initCronProcsWith ({ startMonitoredProcess, getCronProcesses }) {
/**
* Run upon server initialization.
* Checks the database for cron processes. If found, start those processes.
*/
return async () => {
/**
* If no cron processes are found, continue
Expand All @@ -139,8 +110,7 @@ function initCronProcsWith ({ startMonitoredProcess, getCronProcesses }) {
if (!cronProcesses) return

/*
* start new os procs when the server starts because
* the server has either restarted or been redeployed.
* Iterate through new processes when the server starts
*/
for (const { processId } of cronProcesses) {
try {
Expand All @@ -152,7 +122,7 @@ function initCronProcsWith ({ startMonitoredProcess, getCronProcesses }) {
}
}

function startMonitoredProcessWith ({ fetch, histogram, logger, CU_URL, fetchCron, crank, monitorGauge, saveCronProcess, getCronProcessCursor, updateCronProcessCursor }) {
function startMonitoredProcessWith ({ fetch, cron, histogram, logger, CU_URL, fetchCron, crank, monitorGauge, saveCronProcess, getCronProcessCursor, updateCronProcessCursor }) {
const getCursorFetch = withTimerMetricsFetch({
fetch,
timer: histogram,
Expand Down
142 changes: 111 additions & 31 deletions servers/mu/src/domain/clients/cron.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@ const logger = () => undefined
logger.tap = () => (args) => {
return args
}
logger.child = () => {
const tempLogger = () => undefined
tempLogger.tap = () => (args) => {
return args
}
return tempLogger
}

describe('cron', () => {
describe('initCronProcsWith', () => {
test('should not start process if proc file could not be read', async () => {
test('should not start process if db could not be read', async () => {
await cron.initCronProcsWith({
getCronProcesses: () => undefined,
startMonitoredProcess: async () => {
Expand All @@ -19,54 +26,127 @@ describe('cron', () => {
})()
})

describe('processing proc file runs successfully when not initial run and skips subsequent runs', async () => {
describe('processing proc file runs startMonitor for each process read from the db', async () => {
let startMonitoredProcessCalls = 0
const initCronProcs = cron.initCronProcsWith({
getCronProcesses: () => [{ processId: '1', status: 'running' }],
getCronProcesses: () => [{ processId: '1', status: 'running' }, { processId: '2', status: 'running' }],
startMonitoredProcess: async () => {
console.log('here ')
startMonitoredProcessCalls++
return Promise.resolve()
}
})

test('should start process by reading proc file and saving procs', async () => {
await initCronProcs()
assert.equal(startMonitoredProcessCalls, 1)
})
})

describe('dumps proc file into sqlite if file is present', async () => {
let saveCronProcessCalls = 0
const initCronProcs = cron.initCronProcsWith({
getCronProcesses: () => [{ processId: 'pid-123', status: 'running' }, { processId: 'pid-456', status: 'running' }],
startMonitoredProcess: async () => { saveCronProcessCalls++ }
})

test('make sure crons are being saved to sqlite', async () => {
test('should start process by reading proc db and starting processes', async () => {
await initCronProcs()
assert.equal(saveCronProcessCalls, 2)
assert.equal(startMonitoredProcessCalls, 2)
})
})
})

describe('startMonitoredProcessWith', () => {
describe('startMonitoredProcessWith', async () => {
let monitorGaugeValue = 0
const startMonitoredProcess = cron.startMonitoredProcessWith({
fetch: async () => undefined,
cron: {
schedule: (expression, _callback) => {
assert.equal(expression, '*/10 * * * * *')
return {
start: () => console.log('start'),
stop: () => console.log('stop')
}
}
},
histogram: { startTimer: async () => undefined },
logger,
saveCronProcess: async ({ processId }) => assert.ok(['foo', 'bar'].includes(processId)),
monitorGauge: {
inc: () => {
monitorGaugeValue++
}
}
})

test('start initial monitor of process foo', async () => {
await startMonitoredProcess({ processId: 'foo' })
assert.equal(monitorGaugeValue, 1)
})
test('start initial monitor of process bar', async () => {
await startMonitoredProcess({ processId: 'bar' })
assert.equal(monitorGaugeValue, 2)
})
test('double monitor of process foo', async () => {
let errorCaught = false
try {
await startMonitoredProcess({ processId: 'foo' })
} catch (e) {
errorCaught = true
assert.equal(e.message, 'Process already being monitored')
}
assert.equal(monitorGaugeValue, 2)
assert.ok(errorCaught)
})
})

describe('killMonitoredProcessWith', () => {
let monitorGaugeValue = 0
const startMonitoredProcess = cron.startMonitoredProcessWith({
fetch: async () => undefined,
cron: {
schedule: (expression, _callback) => {
assert.equal(expression, '*/10 * * * * *')
return {
start: () => console.log('start'),
stop: () => console.log('stop')
}
}
},
histogram: { startTimer: async () => undefined },
logger,
saveCronProcess: async ({ processId }) => assert.ok(['process-123', 'process-456'].includes(processId)),
monitorGauge: {
inc: () => {
monitorGaugeValue++
}
}
})
const killMonitoredProcess = cron.killMonitoredProcessWith({
logger,
monitorGauge: {
dec: () => { monitorGaugeValue-- }
},
deleteCronProcess: async ({ processId }) => assert.ok(['process-123', 'process-456'].includes(processId))
})

test('should not kill process if process is not a running cron', async () => {
const killMonitoredProcess = cron.killMonitoredProcessWith({
PROC_FILE_PATH: 'test',
logger,
monitorGauge: () => ({
dec: () => {}
}),
deleteCronProcess: async () => Promise.resolve()
})
await killMonitoredProcess({ processId: 1 }).catch(() => {
assert.ok('should catch error')
})
let errorCaught = false
try {
await killMonitoredProcess({ processId: 'process-123' })
} catch (e) {
errorCaught = true
assert.equal(e.message, 'Process monitor not found')
}
assert.ok(errorCaught)
})

test('should kill process monitors when they exist', async () => {
await startMonitoredProcess({ processId: 'process-123' })
assert.equal(monitorGaugeValue, 1)
await startMonitoredProcess({ processId: 'process-456' })
assert.equal(monitorGaugeValue, 2)
await killMonitoredProcess({ processId: 'process-456' })
assert.equal(monitorGaugeValue, 1)
await killMonitoredProcess({ processId: 'process-123' })
assert.equal(monitorGaugeValue, 0)

// Ensure processes were actually removed
let errorCaught = false
try {
await killMonitoredProcess({ processId: 'process-123' })
} catch (e) {
errorCaught = true
assert.equal(e.message, 'Process monitor not found')
}
assert.ok(errorCaught)
})
})
})
51 changes: 51 additions & 0 deletions servers/mu/src/domain/clients/tracer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,57 @@ import { propOr, tap } from 'ramda'
import { randomBytes } from 'crypto'
import { TRACES_TABLE } from './sqlite.js'

/**
* Each minute, a cron runs to check if the trace db is overflowing.
* If it is, delete a calculated amount of rows to clear up room.
* @param {{ overflow: string }} overflow - the percentage the database needs to reduce by to return below the memory limit
*/
export function deleteOldTracesWith ({ db }) {
return async () => {
function createQuery ({ overflow }) {
/**
* Check if the database has grown to greater than 1GB.
* If it has, delete the least recent 10% of traces.
*/
return {
sql: `
WITH delete_entries AS (
SELECT COUNT(*) AS total_rows, CEILING(COUNT(*) * ?) AS rows_to_delete
FROM ${TRACES_TABLE}
)
DELETE FROM ${TRACES_TABLE}
WHERE timestamp IN (
SELECT timestamp
FROM ${TRACES_TABLE}
ORDER BY timestamp ASC
LIMIT (SELECT rows_to_delete FROM delete_entries)
)
`,
parameters: [overflow]
}
}
const pageSize = await db.pragma('page_size', { simple: true })
const pageCount = await db.pragma('page_count', { simple: true })
/**
* Calculate if we are over the maximum amount of bytes allocated
* to the trace database. If we are, we will remove the oldest traces
* such that we will return to below the maximum amount.
*/
const totalBytes = pageSize * pageCount
const maxBytes = 1024 * 1024 * 1024 // 1 GB
const overflow = maxBytes / totalBytes
/**
* If we have less bytes used then available, return
*/
if (overflow >= 1) return
await db.run(createQuery({ overflow: 1 - overflow }))
/**
* VACUUM allows sqlite to release the freed memory
*/
db.run({ sql: 'VACUUM;', parameters: [] })
}
}

function createTraceWith ({ db }) {
/**
* Persist the logs to the storage system for traces
Expand Down
Loading

0 comments on commit 531d5e9

Please sign in to comment.