Skip to content

Commit

Permalink
dev: add worker creation after pool thread count drops below what is …
Browse files Browse the repository at this point in the history
…configured
  • Loading branch information
jlong145 committed Oct 16, 2024
1 parent 6810a86 commit fa242f5
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 3 deletions.
13 changes: 12 additions & 1 deletion src/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ export class Pool {
private _waitLock: Promise<void> | undefined;
// deno-lint-ignore no-explicit-any
private _waitLockResolver: any | undefined;
private _definition: string | undefined;

constructor(args: PoolArgs) {
this._args = args;
}

init = async (definition: string): Promise<void> => {
this._definition = definition;

for (let i = 0; i < this._args.workerCount; i++) {
this.threads.push(
new WorkerHandler(definition, { taskCount: this._args.taskCount }),
Expand Down Expand Up @@ -47,7 +50,15 @@ export class Pool {
return t.isReady();
});
if (!thread) {
task.reject(new Error("Max pool queue reached"));
if (this.threads.length < this._args.workerCount) {
this.threads.push(
new WorkerHandler(this._definition!, {
taskCount: this._args.taskCount,
}),
);
} else {
task.reject(new Error("Max pool queue reached"));
}
}

this.tasks.push(task);
Expand Down
28 changes: 27 additions & 1 deletion tests/instance_wrapper_runtime_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class TestExample extends WorkerDefinition {
_args: Record<string, any>,
): SharedArrayBuffer => {
while (true) {}
return buffer;
};
}

Expand Down Expand Up @@ -165,3 +164,30 @@ Deno.test("Timeout should kill worker if there is no response", async () => {

assertEquals(inst.pool!.threads.length, 4);
});



Deno.test("Timeout should kill worker and Pool should create new worker on next execution call", async () => {
const inst = new TestExample();
const wrapper = new InstanceWrapper<TestExample>(inst, {
workerCount: 1,
taskCount: 1,
});

await wrapper.start();
try {
const task = inst.execute("testInfiniteLoop", {});
task.timeout(100);
await task;
} catch(e) {
// catch the timeout error
}

assertEquals(inst.pool!.threads.length, 0);

const task = inst.execute("foo", {});
const buffer = await task;

assertEquals(inst.pool!.threads.length, 1);
});

2 changes: 1 addition & 1 deletion tests/pool_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Pool } from "../src/Pool.ts";

Deno.test("Pool should initalize with correct worker count", async () => {
const poolSize = 10;
const pool = new Pool({ workerCount: poolSize });
const pool = new Pool({ workerCount: poolSize, taskCount: 10 });
await pool.init("console.log('hello world from pool test');");
assertEquals(pool.threads.length, poolSize);

Expand Down

0 comments on commit fa242f5

Please sign in to comment.