Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Least connected load balancing strategy #267

Merged
merged 4 commits into from
Jul 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/v1/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ class Driver {
* @constructor
* @param {string} url
* @param {string} userAgent
* @param {Object} token
* @param {Object} config
* @access private
* @param {object} token
* @param {object} config
* @protected
*/
constructor(url, userAgent, token = {}, config = {}) {
this._url = url;
Expand Down
6 changes: 6 additions & 0 deletions src/v1/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ const USER_AGENT = "neo4j-javascript/" + VERSION;
* // will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient errors with
* // exponential backoff using initial delay of 1 second. Default value is 30000 which is 30 seconds.
* maxTransactionRetryTime: 30000,
*
* // Provide an alternative load balancing strategy for the routing driver to use.
* // Driver uses "least_connected" by default.
* // <b>Note:</b> We are experimenting with different strategies. This could be removed in the next minor
* // version.
* loadBalancingStrategy: "least_connected" | "round_robin",
* }
*
* @param {string} url The URL for the Neo4j database, for instance "bolt://localhost"
Expand Down
5 changes: 2 additions & 3 deletions src/v1/internal/connection-providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import Rediscovery from './rediscovery';
import hasFeature from './features';
import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers';
import RoutingUtil from './routing-util';
import RoundRobinLoadBalancingStrategy from './round-robin-load-balancing-strategy';

class ConnectionProvider {

Expand Down Expand Up @@ -62,15 +61,15 @@ export class DirectConnectionProvider extends ConnectionProvider {

export class LoadBalancer extends ConnectionProvider {

constructor(address, routingContext, connectionPool, driverOnErrorCallback) {
constructor(address, routingContext, connectionPool, loadBalancingStrategy, driverOnErrorCallback) {
super();
this._seedRouter = address;
this._routingTable = new RoutingTable([this._seedRouter]);
this._rediscovery = new Rediscovery(new RoutingUtil(routingContext));
this._connectionPool = connectionPool;
this._driverOnErrorCallback = driverOnErrorCallback;
this._hostNameResolver = LoadBalancer._createHostNameResolver();
this._loadBalancingStrategy = new RoundRobinLoadBalancingStrategy();
this._loadBalancingStrategy = loadBalancingStrategy;
this._useSeedRouter = false;
}

Expand Down
85 changes: 85 additions & 0 deletions src/v1/internal/least-connected-load-balancing-strategy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright (c) 2002-2017 "Neo Technology,","
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import RoundRobinArrayIndex from './round-robin-array-index';
import LoadBalancingStrategy from './load-balancing-strategy';

export const LEAST_CONNECTED_STRATEGY_NAME = 'least_connected';

export default class LeastConnectedLoadBalancingStrategy extends LoadBalancingStrategy {

/**
* @constructor
* @param {Pool} connectionPool the connection pool of this driver.
*/
constructor(connectionPool) {
super();
this._readersIndex = new RoundRobinArrayIndex();
this._writersIndex = new RoundRobinArrayIndex();
this._connectionPool = connectionPool;
}

/**
* @inheritDoc
*/
selectReader(knownReaders) {
return this._select(knownReaders, this._readersIndex);
}

/**
* @inheritDoc
*/
selectWriter(knownWriters) {
return this._select(knownWriters, this._writersIndex);
}

_select(addresses, roundRobinIndex) {
const length = addresses.length;
if (length === 0) {
return null;
}

// choose start index for iteration in round-rodin fashion
const startIndex = roundRobinIndex.next(length);
let index = startIndex;

let leastConnectedAddress = null;
let leastActiveConnections = Number.MAX_SAFE_INTEGER;

// iterate over the array to find least connected address
do {
const address = addresses[index];
const activeConnections = this._connectionPool.activeResourceCount(address);

if (activeConnections < leastActiveConnections) {
leastConnectedAddress = address;
leastActiveConnections = activeConnections;
}

// loop over to the start of the array when end is reached
if (index === length - 1) {
index = 0;
} else {
index++;
}
}
while (index !== startIndex);

return leastConnectedAddress;
}
}
1 change: 0 additions & 1 deletion src/v1/internal/load-balancing-strategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
* limitations under the License.
*/


/**
* A facility to select most appropriate reader or writer among the given addresses for request processing.
*/
Expand Down
84 changes: 71 additions & 13 deletions src/v1/internal/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,59 +36,117 @@ class Pool {
this._validate = validate;
this._maxIdle = maxIdle;
this._pools = {};
this._activeResourceCounts = {};
this._release = this._release.bind(this);
}

/**
* Acquire and idle resource fom the pool or create a new one.
* @param {string} key the resource key.
* @return {object} resource that is ready to use.
*/
acquire(key) {
let resource;
let pool = this._pools[key];
if (!pool) {
pool = [];
this._pools[key] = pool;
}
while (pool.length) {
resource = pool.pop();
const resource = pool.pop();

if (this._validate(resource)) {
// idle resource is valid and can be acquired
resourceAcquired(key, this._activeResourceCounts);
return resource;
} else {
this._destroy(resource);
}
}

// there exist no idle valid resources, create a new one for acquisition
resourceAcquired(key, this._activeResourceCounts);
return this._create(key, this._release);
}

/**
* Destroy all idle resources for the given key.
* @param {string} key the resource key to purge.
*/
purge(key) {
let resource;
let pool = this._pools[key] || [];
const pool = this._pools[key] || [];
while (pool.length) {
resource = pool.pop();
const resource = pool.pop();
this._destroy(resource)
}
delete this._pools[key]
}

/**
* Destroy all idle resources in this pool.
*/
purgeAll() {
Object.keys(this._pools).forEach(key => this.purge(key));
}

/**
* Check if this pool contains resources for the given key.
* @param {string} key the resource key to check.
* @return {boolean} <code>true</code> when pool contains entries for the given key, <code>false</code> otherwise.
*/
has(key) {
return (key in this._pools);
}

/**
* Get count of active (checked out of the pool) resources for the given key.
* @param {string} key the resource key to check.
* @return {number} count of resources acquired by clients.
*/
activeResourceCount(key) {
return this._activeResourceCounts[key] || 0;
}

_release(key, resource) {
let pool = this._pools[key];
if (!pool) {
const pool = this._pools[key];

if (pool) {
// there exist idle connections for the given key
if (pool.length >= this._maxIdle || !this._validate(resource)) {
this._destroy(resource);
} else {
pool.push(resource);
}
} else {
// key has been purged, don't put it back, just destroy the resource
this._destroy(resource);
return;
}
if( pool.length >= this._maxIdle || !this._validate(resource) ) {
this._destroy(resource);
} else {
pool.push(resource);
}

resourceReleased(key, this._activeResourceCounts);
}
}

/**
* Increment active (checked out of the pool) resource counter.
* @param {string} key the resource group identifier (server address for connections).
* @param {Object.<string, number>} activeResourceCounts the object holding active counts per key.
*/
function resourceAcquired(key, activeResourceCounts) {
const currentCount = activeResourceCounts[key] || 0;
activeResourceCounts[key] = currentCount + 1;
}

/**
* Decrement active (checked out of the pool) resource counter.
* @param {string} key the resource group identifier (server address for connections).
* @param {Object.<string, number>} activeResourceCounts the object holding active counts per key.
*/
function resourceReleased(key, activeResourceCounts) {
const currentCount = activeResourceCounts[key] || 0;
const nextCount = currentCount - 1;
if (nextCount > 0) {
activeResourceCounts[key] = nextCount;
} else {
delete activeResourceCounts[key];
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/v1/internal/round-robin-load-balancing-strategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import RoundRobinArrayIndex from './round-robin-array-index';
import LoadBalancingStrategy from './load-balancing-strategy';

export const ROUND_ROBIN_STRATEGY_NAME = 'round_robin';

export default class RoundRobinLoadBalancingStrategy extends LoadBalancingStrategy {

constructor() {
Expand Down
22 changes: 21 additions & 1 deletion src/v1/routing-driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import Session from './session';
import {Driver} from './driver';
import {newError, SESSION_EXPIRED} from './error';
import {LoadBalancer} from './internal/connection-providers';
import LeastConnectedLoadBalancingStrategy, {LEAST_CONNECTED_STRATEGY_NAME} from './internal/least-connected-load-balancing-strategy';
import RoundRobinLoadBalancingStrategy, {ROUND_ROBIN_STRATEGY_NAME} from './internal/round-robin-load-balancing-strategy';

/**
* A driver that supports routing in a core-edge cluster.
Expand All @@ -34,7 +36,8 @@ class RoutingDriver extends Driver {
}

_createConnectionProvider(address, connectionPool, driverOnErrorCallback) {
return new LoadBalancer(address, this._routingContext, connectionPool, driverOnErrorCallback);
const loadBalancingStrategy = RoutingDriver._createLoadBalancingStrategy(this._config, connectionPool);
return new LoadBalancer(address, this._routingContext, connectionPool, loadBalancingStrategy, driverOnErrorCallback);
}

_createSession(mode, connectionProvider, bookmark, config) {
Expand Down Expand Up @@ -80,6 +83,23 @@ class RoutingDriver extends Driver {
return error.code === 'Neo.ClientError.Cluster.NotALeader' ||
error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase';
}

/**
* Create new load balancing strategy based on the config.
* @param {object} config the user provided config.
* @param {Pool} connectionPool the connection pool for this driver.
* @return {LoadBalancingStrategy} new strategy.
*/
static _createLoadBalancingStrategy(config, connectionPool) {
const configuredValue = config.loadBalancingStrategy;
if (!configuredValue || configuredValue === LEAST_CONNECTED_STRATEGY_NAME) {
return new LeastConnectedLoadBalancingStrategy(connectionPool);
} else if (configuredValue === ROUND_ROBIN_STRATEGY_NAME) {
return new RoundRobinLoadBalancingStrategy();
} else {
throw newError('Unknown load balancing strategy: ' + configuredValue);
}
}
}

class RoutingSession extends Session {
Expand Down
9 changes: 7 additions & 2 deletions test/internal/connection-providers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../../src/v1/error';
import RoutingTable from '../../src/v1/internal/routing-table';
import {DirectConnectionProvider, LoadBalancer} from '../../src/v1/internal/connection-providers';
import Pool from '../../src/v1/internal/pool';
import LeastConnectedLoadBalancingStrategy from '../../src/v1/internal/least-connected-load-balancing-strategy';

const NO_OP_DRIVER_CALLBACK = () => {
};
Expand Down Expand Up @@ -134,7 +135,9 @@ describe('LoadBalancer', () => {
});

it('initializes routing table with the given router', () => {
const loadBalancer = new LoadBalancer('server-ABC', {}, newPool(), NO_OP_DRIVER_CALLBACK);
const connectionPool = newPool();
const loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(connectionPool);
const loadBalancer = new LoadBalancer('server-ABC', {}, connectionPool, loadBalancingStrategy, NO_OP_DRIVER_CALLBACK);

expectRoutingTable(loadBalancer,
['server-ABC'],
Expand Down Expand Up @@ -1068,7 +1071,9 @@ function newLoadBalancerWithSeedRouter(seedRouter, seedRouterResolved,
expirationTime = Integer.MAX_VALUE,
routerToRoutingTable = {},
connectionPool = null) {
const loadBalancer = new LoadBalancer(seedRouter, {}, connectionPool || newPool(), NO_OP_DRIVER_CALLBACK);
const pool = connectionPool || newPool();
const loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(pool);
const loadBalancer = new LoadBalancer(seedRouter, {}, pool, loadBalancingStrategy, NO_OP_DRIVER_CALLBACK);
loadBalancer._routingTable = new RoutingTable(routers, readers, writers, expirationTime);
loadBalancer._rediscovery = new FakeRediscovery(routerToRoutingTable);
loadBalancer._hostNameResolver = new FakeDnsResolver(seedRouterResolved);
Expand Down
Loading