From 99205ec1e6dbaecb5e4b4494341d64c0a7b479eb Mon Sep 17 00:00:00 2001 From: Subhi Al Hasan Date: Thu, 14 Jul 2022 13:32:04 +0300 Subject: [PATCH] feat: use weighted round robin in balancedPool (#1069) * fixes * more fixes * add test * remove console.log * rename startingWeightPerServer to maxWeightPerServer * add another test --- lib/balanced-pool.js | 85 +++++++++++- test/balanced-pool.js | 316 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 390 insertions(+), 11 deletions(-) diff --git a/lib/balanced-pool.js b/lib/balanced-pool.js index bb5788a8c1a..47468ec0460 100644 --- a/lib/balanced-pool.js +++ b/lib/balanced-pool.js @@ -18,6 +18,17 @@ const { parseOrigin } = require('./core/util') const kFactory = Symbol('factory') const kOptions = Symbol('options') +const kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor') +const kCurrentWeight = Symbol('kCurrentWeight') +const kIndex = Symbol('kIndex') +const kWeight = Symbol('kWeight') +const kMaxWeightPerServer = Symbol('kMaxWeightPerServer') +const kErrorPenalty = Symbol('kErrorPenalty') + +function getGreatestCommonDivisor (a, b) { + if (b === 0) return a + return getGreatestCommonDivisor(b, a % b) +} function defaultFactory (origin, opts) { return new Pool(origin, opts) @@ -28,6 +39,11 @@ class BalancedPool extends PoolBase { super() this[kOptions] = opts + this[kIndex] = -1 + this[kCurrentWeight] = 0 + + this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100 + this[kErrorPenalty] = this[kOptions].errorPenalty || 15 if (!Array.isArray(upstreams)) { upstreams = [upstreams] @@ -42,6 +58,7 @@ class BalancedPool extends PoolBase { for (const upstream of upstreams) { this.addUpstream(upstream) } + this._updateBalancedPoolStats() } addUpstream (upstream) { @@ -54,12 +71,40 @@ class BalancedPool extends PoolBase { ))) { return this } + const pool = this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions])) + + this[kAddClient](pool) + pool.on('connect', () => { + pool[kWeight] = Math.min(this[kMaxWeightPerServer], pool[kWeight] + this[kErrorPenalty]) + }) + + pool.on('connectionError', () => { + pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty]) + this._updateBalancedPoolStats() + }) + + pool.on('disconnect', (...args) => { + const err = args[2] + if (err && err.code === 'UND_ERR_SOCKET') { + // decrease the weight of the pool. + pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty]) + this._updateBalancedPoolStats() + } + }) + + for (const client of this[kClients]) { + client[kWeight] = this[kMaxWeightPerServer] + } - this[kAddClient](this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions]))) + this._updateBalancedPoolStats() return this } + _updateBalancedPoolStats () { + this[kGreatestCommonDivisor] = this[kClients].map(p => p[kWeight]).reduce(getGreatestCommonDivisor, 0) + } + removeUpstream (upstream) { const upstreamOrigin = parseOrigin(upstream).origin @@ -100,10 +145,42 @@ class BalancedPool extends PoolBase { return } - this[kClients].splice(this[kClients].indexOf(dispatcher), 1) - this[kClients].push(dispatcher) + const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true) + + if (allClientsBusy) { + return + } + + let counter = 0 + + let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain]) + + while (counter++ < this[kClients].length) { + this[kIndex] = (this[kIndex] + 1) % this[kClients].length + const pool = this[kClients][this[kIndex]] + + // find pool index with the largest weight + if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) { + maxWeightIndex = this[kIndex] + } + + // decrease the current weight every `this[kClients].length`. + if (this[kIndex] === 0) { + // Set the current weight to the next lower weight. + this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor] + + if (this[kCurrentWeight] <= 0) { + this[kCurrentWeight] = this[kMaxWeightPerServer] + } + } + if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) { + return pool + } + } - return dispatcher + this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight] + this[kIndex] = maxWeightIndex + return this[kClients][maxWeightIndex] } } diff --git a/test/balanced-pool.js b/test/balanced-pool.js index b677a68af1b..7e28da7c72d 100644 --- a/test/balanced-pool.js +++ b/test/balanced-pool.js @@ -1,14 +1,9 @@ 'use strict' +const { test } = require('tap') +const { BalancedPool, Pool, Client, errors } = require('..') const { createServer } = require('http') const { promisify } = require('util') -const { test } = require('tap') -const { - BalancedPool, - Client, - errors, - Pool -} = require('..') test('throws when factory is not a function', (t) => { t.plan(2) @@ -250,3 +245,310 @@ test('throws when upstream is missing', async (t) => { t.equal(e.message, 'No upstream has been added to the BalancedPool') } }) + +class TestServer { + constructor ({ config: { server, socketHangup, downOnRequests, socketHangupOnRequests }, onRequest }) { + this.config = { + downOnRequests: downOnRequests || [], + socketHangupOnRequests: socketHangupOnRequests || [], + socketHangup + } + this.name = server + // start a server listening to any port available on the host + this.port = 0 + this.iteration = 0 + this.requestsCount = 0 + this.onRequest = onRequest + this.server = null + } + + _shouldHangupOnClient () { + if (this.config.socketHangup) { + return true + } + if (this.config.socketHangupOnRequests.includes(this.requestsCount)) { + return true + } + + return false + } + + _shouldStopServer () { + if (this.config.upstreamDown === true || this.config.downOnRequests.includes(this.requestsCount)) { + return true + } + return false + } + + async prepareForIteration (iteration) { + // set current iteration + this.iteration = iteration + + if (this._shouldStopServer()) { + await this.stop() + } else if (!this.isRunning()) { + await this.start() + } + } + + start () { + this.server = createServer((req, res) => { + if (this._shouldHangupOnClient()) { + req.destroy(new Error('(ツ)')) + return + } + this.requestsCount++ + res.end('server is running!') + + this.onRequest(this) + }).listen(this.port) + + this.server.keepAliveTimeout = 2000 + + return new Promise((resolve) => { + this.server.on('listening', () => { + // store the used port to use it again if the server was stopped as part of test and then started again + this.port = this.server.address().port + + return resolve() + }) + }) + } + + isRunning () { + return !!this.server.address() + } + + stop () { + if (!this.isRunning()) { + return + } + + return new Promise(resolve => { + this.server.close(() => resolve()) + }) + } +} + +const cases = [ + + // 0 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 7, + config: [{ server: 'A' }, { server: 'B' }, { server: 'C' }], + expected: ['A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 0, + expectedSocketErrors: 0, + expectedRatios: [0.34, 0.33, 0.33] + }, + + // 1 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A', downOnRequests: [0] }, { server: 'B' }, { server: 'C' }], + expected: ['A/connectionRefused', 'B', 'C', 'B', 'C', 'B', 'C', 'A', 'B', 'C', 'A'], + expectedConnectionRefusedErrors: 1, + expectedSocketErrors: 0, + expectedRatios: [0.32, 0.34, 0.34] + }, + + // 2 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A' }, { server: 'B', downOnRequests: [0] }, { server: 'C' }], + expected: ['A', 'B/connectionRefused', 'C', 'A', 'C', 'A', 'C', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 1, + expectedSocketErrors: 0, + expectedRatios: [0.34, 0.32, 0.34] + }, + + // 3 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A' }, { server: 'B', downOnRequests: [0] }, { server: 'C', downOnRequests: [0] }], + expected: ['A', 'B/connectionRefused', 'C/connectionRefused', 'A', 'A', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 2, + expectedSocketErrors: 0, + expectedRatios: [0.35, 0.33, 0.32] + }, + + // 4 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A', downOnRequests: [0] }, { server: 'B', downOnRequests: [0] }, { server: 'C', downOnRequests: [0] }], + expected: ['A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A', 'B', 'C', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 3, + expectedSocketErrors: 0, + expectedRatios: [0.34, 0.33, 0.33] + }, + + // 5 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A', downOnRequests: [0, 1, 2] }, { server: 'B', downOnRequests: [0, 1, 2] }, { server: 'C', downOnRequests: [0, 1, 2] }], + expected: ['A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A', 'B', 'C', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 9, + expectedSocketErrors: 0, + expectedRatios: [0.34, 0.33, 0.33] + }, + + // 6 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A', downOnRequests: [0] }, { server: 'B', downOnRequests: [0, 1] }, { server: 'C', downOnRequests: [0] }], + expected: ['A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A', 'B/connectionRefused', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'C', 'A', 'C', 'A', 'C', 'A', 'B'], + expectedConnectionRefusedErrors: 4, + expectedSocketErrors: 0, + expectedRatios: [0.36, 0.29, 0.35] + }, + + // 7 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A' }, { server: 'B' }, { server: 'C', downOnRequests: [1] }], + expected: ['A', 'B', 'C', 'A', 'B', 'C/connectionRefused', 'A', 'B', 'A', 'B', 'A', 'B', 'C', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 1, + expectedSocketErrors: 0, + expectedRatios: [0.34, 0.34, 0.32] + }, + + // 8 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A', socketHangupOnRequests: [1] }, { server: 'B' }, { server: 'C' }], + expected: ['A', 'B', 'C', 'A/socketError', 'B', 'C', 'B', 'C', 'B', 'C', 'A'], + expectedConnectionRefusedErrors: 0, + expectedSocketErrors: 1, + expectedRatios: [0.32, 0.34, 0.34] + }, + + // 9 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 7, + config: [{ server: 'A' }, { server: 'B' }, { server: 'C' }, { server: 'D' }, { server: 'E' }], + expected: ['A', 'B', 'C', 'D', 'E', 'A', 'B', 'C', 'D', 'E'], + expectedConnectionRefusedErrors: 0, + expectedSocketErrors: 0, + expectedRatios: [0.2, 0.2, 0.2, 0.2, 0.2] + }, + + // 10 + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A', downOnRequests: [0, 1, 2, 3] }, { server: 'B' }, { server: 'C' }], + expected: ['A/connectionRefused', 'B', 'C', 'B', 'C', 'B', 'C', 'A/connectionRefused', 'B', 'C', 'B', 'C', 'A/connectionRefused', 'B', 'C', 'B', 'C', 'A/connectionRefused', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 4, + expectedSocketErrors: 0, + expectedRatios: [0.18, 0.41, 0.41] + } + +] + +for (const [index, { config, expected, expectedRatios, iterations = 9, expectedConnectionRefusedErrors = 0, expectedSocketErrors = 0, maxWeightPerServer, errorPenalty = 10 }] of cases.entries()) { + test(`weighted round robin - case ${index}`, async (t) => { + // cerate an array to store succesfull reqeusts + const requestLog = [] + + // create instances of the test servers according to the config + const servers = config.map((serverConfig) => new TestServer({ + config: serverConfig, + onRequest: (server) => { + requestLog.push(server.name) + } + })) + t.teardown(() => servers.map(server => server.stop())) + + // start all servers to get a port so that we can build the upstream urls to supply them to undici + await Promise.all(servers.map(server => server.start())) + + // build upstream urls + const urls = servers.map(server => `http://localhost:${server.port}`) + + // add upstreams + const client = new BalancedPool(urls[0], { maxWeightPerServer, errorPenalty }) + urls.slice(1).map(url => client.addUpstream(url)) + + let connectionRefusedErrors = 0 + let socketErrors = 0 + for (let i = 0; i < iterations; i++) { + // setup test servers for the next iteration + + await Promise.all(servers.map(server => server.prepareForIteration(i))) + + // send a request using undinci + try { + await client.request({ path: '/', method: 'GET' }) + } catch (e) { + const serverWithError = servers.find(server => server.port === e.port) || servers.find(server => server.port === e.socket.remotePort) + + serverWithError.requestsCount++ + + if (e.code === 'ECONNREFUSED') { + requestLog.push(`${serverWithError.name}/connectionRefused`) + connectionRefusedErrors++ + } + if (e.code === 'UND_ERR_SOCKET') { + requestLog.push(`${serverWithError.name}/socketError`) + + socketErrors++ + } + } + } + const totalRequests = servers.reduce((acc, server) => { + return acc + server.requestsCount + }, 0) + + t.equal(totalRequests, iterations) + + t.equal(connectionRefusedErrors, expectedConnectionRefusedErrors) + t.equal(socketErrors, expectedSocketErrors) + + if (expectedRatios) { + const ratios = servers.reduce((acc, el) => { + acc[el.name] = 0 + return acc + }, {}) + requestLog.map(el => ratios[el[0]]++) + + t.match(Object.keys(ratios).map(k => ratios[k] / iterations), expectedRatios) + } + + if (expected) { + t.match(requestLog.slice(0, expected.length), expected) + } + + await client.close() + }) +}