Skip to content

Commit

Permalink
Made pool expose active resource count
Browse files Browse the repository at this point in the history
It will now track count of active (checked out of the pool) resources
per key. Where key is a server address in the driver. Counter is
incremented when resource is acquired from the pool and decremented
when resource is released back to the pool.

Also added JSDocs in the `Pool` class.
  • Loading branch information
lutovich committed Jul 11, 2017
1 parent 094d25f commit 184f480
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 13 deletions.
84 changes: 71 additions & 13 deletions src/v1/internal/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,59 +36,117 @@ class Pool {
this._validate = validate;
this._maxIdle = maxIdle;
this._pools = {};
this._activeResourceCounts = {};
this._release = this._release.bind(this);
}

/**
* Acquire and idle resource fom the pool or create a new one.
* @param {string} key the resource key.
* @return {object} resource that is ready to use.
*/
acquire(key) {
let resource;
let pool = this._pools[key];
if (!pool) {
pool = [];
this._pools[key] = pool;
}
while (pool.length) {
resource = pool.pop();
const resource = pool.pop();

if (this._validate(resource)) {
// idle resource is valid and can be acquired
resourceAcquired(key, this._activeResourceCounts);
return resource;
} else {
this._destroy(resource);
}
}

// there exist no idle valid resources, create a new one for acquisition
resourceAcquired(key, this._activeResourceCounts);
return this._create(key, this._release);
}

/**
* Destroy all idle resources for the given key.
* @param {string} key the resource key to purge.
*/
purge(key) {
let resource;
let pool = this._pools[key] || [];
const pool = this._pools[key] || [];
while (pool.length) {
resource = pool.pop();
const resource = pool.pop();
this._destroy(resource)
}
delete this._pools[key]
}

/**
* Destroy all idle resources in this pool.
*/
purgeAll() {
Object.keys(this._pools).forEach(key => this.purge(key));
}

/**
* Check if this pool contains resources for the given key.
* @param {string} key the resource key to check.
* @return {boolean} <code>true</code> when pool contains entries for the given key, <code>false</code> otherwise.
*/
has(key) {
return (key in this._pools);
}

/**
* Get count of active (checked out of the pool) resources for the given key.
* @param {string} key the resource key to check.
* @return {number} count of resources acquired by clients.
*/
activeResourceCount(key) {
return this._activeResourceCounts[key] || 0;
}

_release(key, resource) {
let pool = this._pools[key];
if (!pool) {
const pool = this._pools[key];

if (pool) {
// there exist idle connections for the given key
if (pool.length >= this._maxIdle || !this._validate(resource)) {
this._destroy(resource);
} else {
pool.push(resource);
}
} else {
// key has been purged, don't put it back, just destroy the resource
this._destroy(resource);
return;
}
if( pool.length >= this._maxIdle || !this._validate(resource) ) {
this._destroy(resource);
} else {
pool.push(resource);
}

resourceReleased(key, this._activeResourceCounts);
}
}

/**
* Increment active (checked out of the pool) resource counter.
* @param {string} key the resource group identifier (server address for connections).
* @param {Object.<string, number>} activeResourceCounts the object holding active counts per key.
*/
function resourceAcquired(key, activeResourceCounts) {
const currentCount = activeResourceCounts[key] || 0;
activeResourceCounts[key] = currentCount + 1;
}

/**
* Decrement active (checked out of the pool) resource counter.
* @param {string} key the resource group identifier (server address for connections).
* @param {Object.<string, number>} activeResourceCounts the object holding active counts per key.
*/
function resourceReleased(key, activeResourceCounts) {
const currentCount = activeResourceCounts[key] || 0;
const nextCount = currentCount - 1;
if (nextCount > 0) {
activeResourceCounts[key] = nextCount;
} else {
delete activeResourceCounts[key];
}
}

Expand Down
80 changes: 80 additions & 0 deletions test/internal/pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,86 @@ describe('Pool', () => {
expect(pool.has(existingKey)).toBeTruthy();
expect(pool.has(absentKey)).toBeFalsy();
});

it('reports zero active resources when empty', () => {
const pool = new Pool((url, release) => new Resource(url, 42, release));

expect(pool.activeResourceCount('bolt://localhost:1')).toEqual(0);
expect(pool.activeResourceCount('bolt://localhost:2')).toEqual(0);
expect(pool.activeResourceCount('bolt://localhost:3')).toEqual(0);
});

it('reports active resources', () => {
const key = 'bolt://localhost:7687';
const pool = new Pool((url, release) => new Resource(url, 42, release));

expect(pool.acquire(key)).toBeDefined();
expect(pool.acquire(key)).toBeDefined();
expect(pool.acquire(key)).toBeDefined();

expect(pool.activeResourceCount(key)).toEqual(3);
});

it('reports active resources when they are created', () => {
const key = 'bolt://localhost:7687';
const pool = new Pool((url, release) => new Resource(url, 42, release));

// three new resources are created
expect(pool.acquire(key)).toBeDefined();
expect(pool.acquire(key)).toBeDefined();
expect(pool.acquire(key)).toBeDefined();

expect(pool.activeResourceCount(key)).toEqual(3);
});

it('reports active resources when they are acquired', () => {
const key = 'bolt://localhost:7687';
const pool = new Pool((url, release) => new Resource(url, 42, release));

// three new resources are created and returned to the pool
const r0 = pool.acquire(key);
const r1 = pool.acquire(key);
const r2 = pool.acquire(key);
r0.close();
r1.close();
r2.close();

// three idle resources are acquired from the pool
const r3 = pool.acquire(key);
const r4 = pool.acquire(key);
const r5 = pool.acquire(key);
expect(r3).toBe(r2);
expect(r4).toBe(r1);
expect(r5).toBe(r0);

expect(pool.activeResourceCount(key)).toEqual(3);
});

it('does not report resources that are returned to the pool', () => {
const key = 'bolt://localhost:7687';
const pool = new Pool((url, release) => new Resource(url, 42, release));

const r0 = pool.acquire(key);
const r1 = pool.acquire(key);
const r2 = pool.acquire(key);
expect(pool.activeResourceCount(key)).toEqual(3);

r0.close();
expect(pool.activeResourceCount(key)).toEqual(2);

r1.close();
expect(pool.activeResourceCount(key)).toEqual(1);

r2.close();
expect(pool.activeResourceCount(key)).toEqual(0);

const r3 = pool.acquire(key);
expect(pool.activeResourceCount(key)).toEqual(1);

r3.close();
expect(pool.activeResourceCount(key)).toEqual(0);
});

});

class Resource {
Expand Down
69 changes: 69 additions & 0 deletions test/v1/session.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,70 @@ describe('session', () => {
});
});

it('should acquire connection for transaction', done => {
expect(session.beginTransaction()).toBeDefined();

const otherSession1 = driver.session();
expect(otherSession1.beginTransaction()).toBeDefined();

const otherSession2 = driver.session();
expect(otherSession2.beginTransaction()).toBeDefined();

const otherSession3 = driver.session();
expect(otherSession3.beginTransaction()).toBeDefined();

expect(numberOfAcquiredConnectionsFromPool()).toEqual(4);

session.close(() => {
otherSession1.close(() => {
otherSession2.close(() => {
otherSession3.close(() => {
done();
});
});
});
});
});

it('should acquire connection for query execution', done => {
session.run('RETURN 42 AS answer').subscribe({
onNext: record => {
expect(record.get('answer').toInt()).toEqual(42);
expect(numberOfAcquiredConnectionsFromPool()).toEqual(1);
},
onCompleted: () => {
session.close(() => {
done();
});
},
onError: error => {
console.log(error);
}
});
});

it('should acquire separate connections for transaction and query execution in different sessions', done => {
const otherSession = driver.session();
expect(otherSession.beginTransaction()).toBeDefined();

session.run('RETURN 42 AS answer').subscribe({
onNext: record => {
expect(record.get('answer').toInt()).toEqual(42);
expect(numberOfAcquiredConnectionsFromPool()).toEqual(2);
},
onCompleted: () => {
otherSession.close(() => {
session.close(() => {
done();
});
});
},
onError: error => {
console.log(error);
}
});
});

function serverIs31OrLater(done) {
// lazy way of checking the version number
// if server has been set we know it is at least 3.1
Expand Down Expand Up @@ -1043,4 +1107,9 @@ describe('session', () => {
});
}

function numberOfAcquiredConnectionsFromPool() {
const pool = driver._pool;
return pool.activeResourceCount('localhost');
}

});

0 comments on commit 184f480

Please sign in to comment.