Skip to content

Commit

Permalink
Merge pull request #1035 from bluewave-labs/feat/be/job-queue-tests
Browse files Browse the repository at this point in the history
Feat/be/job queue tests
  • Loading branch information
ajhollid authored Oct 24, 2024
2 parents 8f17cfb + 47946cb commit d680593
Show file tree
Hide file tree
Showing 8 changed files with 860 additions and 98 deletions.
17 changes: 16 additions & 1 deletion Client/src/App.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import { useSelector } from "react-redux";
import { CssBaseline } from "@mui/material";
import { useEffect } from "react";
import { useDispatch } from "react-redux";
import { getAppSettings } from "./Features/Settings/settingsSlice";
import { getAppSettings, updateAppSettings } from "./Features/Settings/settingsSlice";
import { logger } from "./Utils/Logger"; // Import the logger
import { networkService } from "./main";
function App() {
Expand Down Expand Up @@ -66,6 +66,21 @@ function App() {
};
}, []);

useEffect(() => {
const thing = async () => {
const action = await dispatch(
updateAppSettings({ authToken, settings: { apiBaseUrl: "test" } })
);

if (action.payload.success) {
console.log(action.payload.data);
} else {
console.log(action);
}
};
thing();
}, [dispatch, authToken]);

return (
<ThemeProvider theme={mode === "light" ? lightTheme : darkTheme}>
<CssBaseline />
Expand Down
1 change: 1 addition & 0 deletions Client/src/Features/Settings/settingsSlice.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export const updateAppSettings = createAsyncThunk(
systemEmailAddress: settings.systemEmailAddress,
systemEmailPassword: settings.systemEmailPassword,
};
console.log(parsedSettings);
const res = await networkService.updateAppSettings({
settings: parsedSettings,
authToken,
Expand Down
85 changes: 43 additions & 42 deletions Client/src/Utils/NetworkService.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class NetworkService {
this.setBaseUrl(baseURL);
this.unsubscribe = store.subscribe(() => {
const state = store.getState();
console.log(state.settings.apiBaseUrl);
if (BASE_URL !== undefined) {
baseURL = BASE_URL;
} else if (state?.settings?.apiBaseUrl ?? null) {
Expand Down Expand Up @@ -87,48 +88,48 @@ class NetworkService {
},
});
}
/**
*
* ************************************
* Check the endpoint resolution
* ************************************
*
* @async
* @param {Object} config - The configuration object.
* @param {string} config.authToken - The authorization token to be used in the request header.
* @param {Object} config.monitorURL - The monitor url to be sent in the request body.
* @returns {Promise<AxiosResponse>} The response from the axios POST request.
*/
async checkEndpointResolution(config) {
const { authToken, monitorURL } = config;
const params = new URLSearchParams();
if (monitorURL) params.append("monitorURL", monitorURL);

return this.axiosInstance.get(`/monitors/resolution/url?${params.toString()}`, {
headers: {
Authorization: `Bearer ${authToken}`,
"Content-Type": "application/json",
}
})
}

/**
*
* ************************************
* Gets monitors and summary of stats by TeamID
* ************************************
*
* @async
* @param {Object} config - The configuration object.
* @param {string} config.authToken - The authorization token to be used in the request header.
* @param {string} config.teamId - Team ID
* @param {Array<string>} config.types - Array of monitor types
* @returns {Promise<AxiosResponse>} The response from the axios POST request.
*/
async getMonitorsAndSummaryByTeamId(config) {
const params = new URLSearchParams();

/**
*
* ************************************
* Check the endpoint resolution
* ************************************
*
* @async
* @param {Object} config - The configuration object.
* @param {string} config.authToken - The authorization token to be used in the request header.
* @param {Object} config.monitorURL - The monitor url to be sent in the request body.
* @returns {Promise<AxiosResponse>} The response from the axios POST request.
*/
async checkEndpointResolution(config) {
const { authToken, monitorURL } = config;
const params = new URLSearchParams();

if (monitorURL) params.append("monitorURL", monitorURL);

return this.axiosInstance.get(`/monitors/resolution/url?${params.toString()}`, {
headers: {
Authorization: `Bearer ${authToken}`,
"Content-Type": "application/json",
},
});
}

/**
*
* ************************************
* Gets monitors and summary of stats by TeamID
* ************************************
*
* @async
* @param {Object} config - The configuration object.
* @param {string} config.authToken - The authorization token to be used in the request header.
* @param {string} config.teamId - Team ID
* @param {Array<string>} config.types - Array of monitor types
* @returns {Promise<AxiosResponse>} The response from the axios POST request.
*/
async getMonitorsAndSummaryByTeamId(config) {
const params = new URLSearchParams();

if (config.types) {
config.types.forEach((type) => {
Expand Down
12 changes: 11 additions & 1 deletion Server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import { fileURLToPath } from "url";

import { connectDbAndRunServer } from "./configs/db.js";
import queueRouter from "./routes/queueRoute.js";

//JobQueue service and dependencies
import JobQueue from "./service/jobQueue.js";
import { Queue, Worker } from "bullmq";

//Network service and dependencies
import NetworkService from "./service/networkService.js";
Expand Down Expand Up @@ -157,7 +160,14 @@ const startApp = async () => {
logger
);
const networkService = new NetworkService(db, emailService, axios, ping, logger, http);
const jobQueue = await JobQueue.createJobQueue(db, networkService, settingsService);
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
Queue,
Worker
);

const cleanup = async () => {
if (cleaningUp) {
Expand Down
67 changes: 36 additions & 31 deletions Server/service/jobQueue.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import { Queue, Worker, Job } from "bullmq";
const QUEUE_NAME = "monitors";

const JOBS_PER_WORKER = 5;
import logger from "../utils/logger.js";
import { errorMessages, successMessages } from "../utils/messages.js";
const SERVICE_NAME = "JobQueue";
/**
Expand All @@ -19,11 +16,13 @@ class JobQueue {
* @param {SettingsService} settingsService - The settings service
* @throws {Error}
*/
constructor(settingsService) {
const { redisHost, redisPort } = settingsService.getSettings();
constructor(settingsService, logger, Queue, Worker) {
const settings = settingsService.getSettings() || {};

const { redisHost = "127.0.0.1", redisPort = 6379 } = settings;
const connection = {
host: redisHost || "127.0.0.1",
port: redisPort || 6379,
host: redisHost,
port: redisPort,
};
this.connection = connection;
this.queue = new Queue(QUEUE_NAME, {
Expand All @@ -33,6 +32,8 @@ class JobQueue {
this.db = null;
this.networkService = null;
this.settingsService = settingsService;
this.logger = logger;
this.Worker = Worker;
}

/**
Expand All @@ -42,8 +43,15 @@ class JobQueue {
* @returns {Promise<JobQueue>} - Returns a new JobQueue
*
*/
static async createJobQueue(db, networkService, settingsService) {
const queue = new JobQueue(settingsService);
static async createJobQueue(
db,
networkService,
settingsService,
logger,
Queue,
Worker
) {
const queue = new JobQueue(settingsService, logger, Queue, Worker);
try {
queue.db = db;
queue.networkService = networkService;
Expand All @@ -69,7 +77,7 @@ class JobQueue {
* @returns {Worker} The newly created worker
*/
createWorker() {
const worker = new Worker(
const worker = new this.Worker(
QUEUE_NAME,
async (job) => {
try {
Expand All @@ -96,17 +104,16 @@ class JobQueue {
}
return acc;
}, false);

if (!maintenanceWindowActive) {
await this.networkService.getStatus(job);
} else {
logger.info(`Monitor ${monitorId} is in maintenance window`, {
this.logger.info(`Monitor ${monitorId} is in maintenance window`, {
service: SERVICE_NAME,
monitorId,
});
}
} catch (error) {
logger.error(`Error processing job ${job.id}: ${error.message}`, {
this.logger.error(`Error processing job ${job.id}: ${error.message}`, {
service: SERVICE_NAME,
jobId: job.id,
error: error,
Expand Down Expand Up @@ -169,11 +176,9 @@ class JobQueue {
}
return true;
}

if (workerStats.load > JOBS_PER_WORKER) {
// Find out how many more jobs we have than current workers can handle
const excessJobs = workerStats.jobs.length - this.workers.length * JOBS_PER_WORKER;

// Divide by jobs/worker to find out how many workers to add
const workersToAdd = Math.ceil(excessJobs / JOBS_PER_WORKER);
for (let i = 0; i < workersToAdd; i++) {
Expand All @@ -188,18 +193,17 @@ class JobQueue {
const workerCapacity = this.workers.length * JOBS_PER_WORKER;
const excessCapacity = workerCapacity - workerStats.jobs.length;
// Calculate how many workers to remove
const workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER);
if (this.workers.length > 5) {
for (let i = 0; i < workersToRemove; i++) {
const worker = this.workers.pop();
try {
await worker.close();
} catch (error) {
// Catch the error instead of throwing it
logger.error(errorMessages.JOB_QUEUE_WORKER_CLOSE, {
service: SERVICE_NAME,
});
}
let workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER); // Make sure there are always at least 5
while (workersToRemove > 0 && this.workers.length > 5) {
const worker = this.workers.pop();
workersToRemove--;
try {
await worker.close();
} catch (error) {
// Catch the error instead of throwing it
this.logger.error(errorMessages.JOB_QUEUE_WORKER_CLOSE, {
service: SERVICE_NAME,
});
}
}
return true;
Expand Down Expand Up @@ -282,14 +286,14 @@ class JobQueue {
every: monitor.interval,
});
if (deleted) {
logger.info(successMessages.JOB_QUEUE_DELETE_JOB, {
this.logger.info(successMessages.JOB_QUEUE_DELETE_JOB, {
service: SERVICE_NAME,
jobId: monitor.id,
});
const workerStats = await this.getWorkerStats();
await this.scaleWorkers(workerStats);
} else {
logger.error(errorMessages.JOB_QUEUE_DELETE_JOB, {
this.logger.error(errorMessages.JOB_QUEUE_DELETE_JOB, {
service: SERVICE_NAME,
jobId: monitor.id,
});
Expand All @@ -311,9 +315,10 @@ class JobQueue {
delayed: await this.queue.getDelayedCount(),
repeatableJobs: (await this.queue.getRepeatableJobs()).length,
};
console.log(metrics);
return metrics;
} catch (error) {
logger.error("Failed to retrieve job queue metrics", {
this.logger.error("Failed to retrieve job queue metrics", {
service: SERVICE_NAME,
errorMsg: error.message,
});
Expand Down Expand Up @@ -344,7 +349,7 @@ class JobQueue {
await this.queue.obliterate();
metrics = await this.getMetrics();
console.log(metrics);
logger.info(successMessages.JOB_QUEUE_OBLITERATE, {
this.logger.info(successMessages.JOB_QUEUE_OBLITERATE, {
service: SERVICE_NAME,
});
return true;
Expand Down
Loading

0 comments on commit d680593

Please sign in to comment.