Skip to content

Commit

Permalink
Merge pull request #289 from ali-ince/1.5-max-connection-pool-size
Browse files Browse the repository at this point in the history
implemented maxConnectionPoolSize logic
  • Loading branch information
lutovich authored Sep 22, 2017
2 parents 649110d + ae5ac46 commit d7d7476
Show file tree
Hide file tree
Showing 9 changed files with 494 additions and 175 deletions.
29 changes: 18 additions & 11 deletions src/v1/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ class Driver {
this._createConnection.bind(this),
this._destroyConnection.bind(this),
this._validateConnection.bind(this),
config.connectionPoolSize
{
maxIdleSize: config.connectionPoolSize,
maxSize: config.maxConnectionPoolSize,
acquisitionTimeout: config.connectionAcquisitionTimeout
}
);

/**
Expand Down Expand Up @@ -231,17 +235,20 @@ class _ConnectionStreamObserver extends StreamObserver {
* @private
*/
function sanitizeConfig(config) {
const maxConnectionLifetime = config.maxConnectionLifetime;
if (maxConnectionLifetime) {
const sanitizedMaxConnectionLifetime = parseInt(maxConnectionLifetime, 10);
if (sanitizedMaxConnectionLifetime && sanitizedMaxConnectionLifetime > 0) {
config.maxConnectionLifetime = sanitizedMaxConnectionLifetime;
} else {
config.maxConnectionLifetime = null;
}
} else {
config.maxConnectionLifetime = null;
config.maxConnectionLifetime = sanitizeIntValue(config.maxConnectionLifetime);
config.maxConnectionPoolSize = sanitizeIntValue(config.maxConnectionPoolSize);
config.connectionAcquisitionTimeout = sanitizeIntValue(config.connectionAcquisitionTimeout, 60000);
}

function sanitizeIntValue(value, defaultValue=null) {
if (value) {
const sanitizedValue = parseInt(value, 10);
if (sanitizedValue && sanitizedValue > 0) {
return sanitizedValue;
}
}

return defaultValue;
}

export {Driver, READ, WRITE}
Expand Down
9 changes: 9 additions & 0 deletions src/v1/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ const USER_AGENT = "neo4j-javascript/" + VERSION;
* // Connection will be destroyed if this threshold is exceeded.
* connectionPoolSize: 50,
*
* // The maximum total number of connections allowed to be managed by the connection pool, per host.
* // This includes both in-use and idle connections. No maximum connection pool size is imposed
* // by default.
* maxConnectionPoolSize: 100,
*
* // The maximum allowed lifetime for a pooled connection in milliseconds. Pooled connections older than this
* // threshold will be closed and removed from the pool. Such discarding happens during connection acquisition
* // so that new session is never backed by an old connection. Setting this option to a low value will cause
Expand All @@ -121,6 +126,10 @@ const USER_AGENT = "neo4j-javascript/" + VERSION;
* // and negative values result in lifetime not being checked.
* maxConnectionLifetime: 30 * 60 * 1000, // 30 minutes
*
* // The maximum amount of time to wait to acquire a connection from the pool (to either create a new
* // connection or borrow an existing one.
* connectionAcquisitionTimeout: 60000, // 1 minute
*
* // Specify the maximum time in milliseconds transactions are allowed to retry via
* // {@link Session#readTransaction()} and {@link Session#writeTransaction()} functions. These functions
* // will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient errors with
Expand Down
21 changes: 11 additions & 10 deletions src/v1/internal/connection-providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ export class DirectConnectionProvider extends ConnectionProvider {
}

acquireConnection(mode) {
const connection = this._connectionPool.acquire(this._address);
const connectionPromise = Promise.resolve(connection);
const connectionPromise = this._connectionPool.acquire(this._address);
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
}
}
Expand Down Expand Up @@ -193,19 +192,21 @@ export class LoadBalancer extends ConnectionProvider {
}

// try next router
const session = this._createSessionForRediscovery(currentRouter);
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter);
return this._createSessionForRediscovery(currentRouter).then(session => {
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter)
});
});
}, Promise.resolve(null));
}

_createSessionForRediscovery(routerAddress) {
const connection = this._connectionPool.acquire(routerAddress);
// initialized connection is required for routing procedure call
// server version needs to be known to decide which routing procedure to use
const initializedConnectionPromise = connection.initializationCompleted();
const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise);
return new Session(READ, connectionProvider);
return this._connectionPool.acquire(routerAddress).then(connection => {
// initialized connection is required for routing procedure call
// server version needs to be known to decide which routing procedure to use
const initializedConnectionPromise = connection.initializationCompleted();
const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise);
return new Session(READ, connectionProvider);
});
}

_applyRoutingTableIfPossible(newRoutingTable) {
Expand Down
108 changes: 89 additions & 19 deletions src/v1/internal/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
* limitations under the License.
*/

import { newError } from "../error";
import { promiseOrTimeout } from "./util";

class Pool {
/**
* @param create an allocation function that creates a new resource. It's given
Expand All @@ -30,12 +33,15 @@ class Pool {
* @param maxIdle the max number of resources that are allowed idle in the pool at
* any time. If this threshold is exceeded, resources will be evicted.
*/
constructor(create, destroy=(()=>true), validate=(()=>true), maxIdle=50) {
constructor(create, destroy=(()=>true), validate=(()=>true), config={}) {
this._create = create;
this._destroy = destroy;
this._validate = validate;
this._maxIdle = maxIdle;
this._maxIdleSize = config.maxIdleSize;
this._maxSize = config.maxSize;
this._acquisitionTimeout = config.acquisitionTimeout;
this._pools = {};
this._acquireRequests = {};
this._activeResourceCounts = {};
this._release = this._release.bind(this);
}
Expand All @@ -46,26 +52,35 @@ class Pool {
* @return {object} resource that is ready to use.
*/
acquire(key) {
let pool = this._pools[key];
if (!pool) {
pool = [];
this._pools[key] = pool;
const resource = this._acquire(key);

if (resource) {
resourceAcquired(key, this._activeResourceCounts);

return Promise.resolve(resource);
}
while (pool.length) {
const resource = pool.pop();

if (this._validate(resource)) {
// idle resource is valid and can be acquired
resourceAcquired(key, this._activeResourceCounts);
return resource;
} else {
this._destroy(resource);
}
// We're out of resources and will try to acquire later on when an existing resource is released.
const allRequests = this._acquireRequests;
const requests = allRequests[key];
if (!requests) {
allRequests[key] = [];
}

// there exist no idle valid resources, create a new one for acquisition
resourceAcquired(key, this._activeResourceCounts);
return this._create(key, this._release);
let request;

return promiseOrTimeout(
this._acquisitionTimeout,
new Promise(
(resolve, reject) => {
request = new PendingRequest(resolve);

allRequests[key].push(request);
}
), () => {
allRequests[key] = allRequests[key].filter(item => item !== request);
}
);
}

/**
Expand Down Expand Up @@ -106,12 +121,37 @@ class Pool {
return this._activeResourceCounts[key] || 0;
}

_acquire(key) {
let pool = this._pools[key];
if (!pool) {
pool = [];
this._pools[key] = pool;
}
while (pool.length) {
const resource = pool.pop();

if (this._validate(resource)) {
// idle resource is valid and can be acquired
return resource;
} else {
this._destroy(resource);
}
}

if (this._maxSize && this.activeResourceCount(key) >= this._maxSize) {
return null;
}

// there exist no idle valid resources, create a new one for acquisition
return this._create(key, this._release);
}

_release(key, resource) {
const pool = this._pools[key];

if (pool) {
// there exist idle connections for the given key
if (pool.length >= this._maxIdle || !this._validate(resource)) {
if (pool.length >= this._maxIdleSize || !this._validate(resource)) {
this._destroy(resource);
} else {
pool.push(resource);
Expand All @@ -121,6 +161,23 @@ class Pool {
this._destroy(resource);
}

// check if there are any pending requests
const requests = this._acquireRequests[key];
if (requests) {
var pending = requests.shift();

if (pending) {
var resource = this._acquire(key);
if (resource) {
pending.resolve(resource);

return;
}
} else {
delete this._acquireRequests[key];
}
}

resourceReleased(key, this._activeResourceCounts);
}
}
Expand All @@ -143,11 +200,24 @@ function resourceAcquired(key, activeResourceCounts) {
function resourceReleased(key, activeResourceCounts) {
const currentCount = activeResourceCounts[key] || 0;
const nextCount = currentCount - 1;

if (nextCount > 0) {
activeResourceCounts[key] = nextCount;
} else {
delete activeResourceCounts[key];
}
}

class PendingRequest {

constructor(resolve) {
this._resolve = resolve;
}

resolve(resource) {
this._resolve(resource);
}

}

export default Pool
19 changes: 19 additions & 0 deletions src/v1/internal/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* limitations under the License.
*/

import { newError } from "../error";

const ENCRYPTION_ON = "ENCRYPTION_ON";
const ENCRYPTION_OFF = "ENCRYPTION_OFF";

Expand Down Expand Up @@ -122,6 +124,22 @@ function trimAndVerify(string, name, url) {
return result;
}

function promiseOrTimeout(timeout, otherPromise, onTimeout) {
const timeoutPromise = new Promise((resolve, reject) => {
const id = setTimeout(() => {
if (onTimeout && typeof onTimeout === 'function') {
onTimeout();
}

reject(newError(`Operation timed out in ${timeout} ms.`));
}, timeout);

otherPromise.then(() => clearTimeout(id), () => clearTimeout(id));
});

return Promise.race([ otherPromise, timeoutPromise ]);
}

export {
isEmptyObjectOrNull,
isString,
Expand All @@ -132,6 +150,7 @@ export {
parseHost,
parsePort,
parseRoutingContext,
promiseOrTimeout,
ENCRYPTION_ON,
ENCRYPTION_OFF
}
Loading

0 comments on commit d7d7476

Please sign in to comment.