From 7ecf57584c97b9aa879445113bb1566cf9b3a645 Mon Sep 17 00:00:00 2001 From: Oleg Kalnichevski Date: Sat, 30 Nov 2024 16:26:06 +0100 Subject: [PATCH] HTTPCORE-773: introduced customizable strategy to select i/o dispatch worker for new i/o channels --- .../impl/nio/bootstrap/H2AsyncRequester.java | 25 +----- .../bootstrap/H2MultiplexingRequester.java | 7 +- .../H2MultiplexingRequesterBootstrap.java | 3 +- .../nio/bootstrap/H2RequesterBootstrap.java | 3 +- .../impl/nio/bootstrap/H2ServerBootstrap.java | 2 +- .../hc/core5/testing/nio/AsyncRequester.java | 3 +- .../hc/core5/testing/nio/AsyncServer.java | 3 +- .../http/impl/bootstrap/AsyncRequester.java | 10 ++- .../bootstrap/AsyncRequesterBootstrap.java | 3 +- .../http/impl/bootstrap/AsyncServer.java | 9 +- .../impl/bootstrap/AsyncServerBootstrap.java | 2 +- .../impl/bootstrap/HttpAsyncRequester.java | 23 ++--- .../http/impl/bootstrap/HttpAsyncServer.java | 20 +---- .../core5/reactor/AbstractIOReactorBase.java | 8 +- .../reactor/DefaultConnectingIOReactor.java | 19 ++-- .../reactor/DefaultListeningIOReactor.java | 68 +++++++------- .../hc/core5/reactor/IOWorkerSelector.java} | 18 ++-- .../hc/core5/reactor/IOWorkerSelectors.java | 75 ++++++++++++++++ .../hc/core5/reactor/IOWorkerStats.java | 50 +++++++++++ .../apache/hc/core5/reactor/IOWorkers.java | 89 ------------------- .../hc/core5/reactor/SingleCoreIOReactor.java | 19 +++- .../core5/reactor/IOWorkerSelectorsTest.java | 51 +++++++++++ 22 files changed, 293 insertions(+), 217 deletions(-) rename httpcore5/src/{test/java/org/apache/hc/core5/reactor/IOWorkersTest.java => main/java/org/apache/hc/core5/reactor/IOWorkerSelector.java} (69%) create mode 100644 httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerSelectors.java create mode 100644 httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerStats.java delete mode 100644 httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkers.java create mode 100644 httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkerSelectorsTest.java diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java index 9fe5850283..0237a24cda 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java @@ -47,6 +47,7 @@ import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; +import org.apache.hc.core5.reactor.IOWorkerSelector; import org.apache.hc.core5.reactor.ProtocolIOSession; import org.apache.hc.core5.reactor.ssl.TlsDetails; import org.apache.hc.core5.util.Timeout; @@ -65,25 +66,6 @@ public class H2AsyncRequester extends HttpAsyncRequester { * Use {@link H2RequesterBootstrap} to create instances of this class. */ @Internal - public H2AsyncRequester( - final HttpVersionPolicy versionPolicy, - final IOReactorConfig ioReactorConfig, - final IOEventHandlerFactory eventHandlerFactory, - final Decorator ioSessionDecorator, - final Callback exceptionCallback, - final IOSessionListener sessionListener, - final ManagedConnPool connPool, - final IOReactorMetricsListener threadPoolListener) { - super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool, threadPoolListener); - this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE; - } - - /** - * Use {@link H2RequesterBootstrap} to create instances of this class. - * - * @since 5.2 - */ - @Internal public H2AsyncRequester( final HttpVersionPolicy versionPolicy, final IOReactorConfig ioReactorConfig, @@ -94,9 +76,10 @@ public H2AsyncRequester( final ManagedConnPool connPool, final TlsStrategy tlsStrategy, final Timeout handshakeTimeout, - final IOReactorMetricsListener threadPoolListener) { + final IOReactorMetricsListener threadPoolListener, + final IOWorkerSelector workerSelector) { super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool, - tlsStrategy, handshakeTimeout, threadPoolListener); + tlsStrategy, handshakeTimeout, threadPoolListener, workerSelector); this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE; } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java index bac51ad67d..88e2fb6d34 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java @@ -73,6 +73,7 @@ import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; +import org.apache.hc.core5.reactor.IOWorkerSelector; import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.Timeout; @@ -98,9 +99,11 @@ public H2MultiplexingRequester( final IOSessionListener sessionListener, final Resolver addressResolver, final TlsStrategy tlsStrategy, - final IOReactorMetricsListener threadPoolListener) { + final IOReactorMetricsListener threadPoolListener, + final IOWorkerSelector workerSelector) { super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, - ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener); + ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, + threadPoolListener, workerSelector); this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy); } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java index c1f40eb4ab..5779de8cea 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java @@ -258,7 +258,8 @@ public H2MultiplexingRequester create() { sessionListener, DefaultAddressResolver.INSTANCE, tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(), - threadPoolListener); + threadPoolListener, + null); } } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java index d0b0bd496f..5a4c9609ed 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java @@ -408,7 +408,8 @@ public H2AsyncRequester create() { connPool, actualTlsStrategy, handshakeTimeout, - threadPoolListener); + threadPoolListener, + null); } } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java index b54c16564c..66d3f015ff 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java @@ -535,7 +535,7 @@ public HttpAsyncServer create() { handshakeTimeout); return new HttpAsyncServer(ioEventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, - sessionListener, threadPoolListener, actualCanonicalHostName); + sessionListener, threadPoolListener, null, actualCanonicalHostName); } } diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java index 7d06a08998..25e65e0f95 100644 --- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java +++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java @@ -66,7 +66,8 @@ DefaultConnectingIOReactor createIOReactor( LoggingExceptionCallback.INSTANCE, LoggingIOSessionListener.INSTANCE, LoggingReactorMetricsListener.INSTANCE, - sessionShutdownCallback); + sessionShutdownCallback, + null); } private InetSocketAddress toSocketAddress(final HttpHost host) { diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java index 3149de2ebd..b334827041 100644 --- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java +++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java @@ -61,7 +61,8 @@ DefaultListeningIOReactor createIOReactor( LoggingExceptionCallback.INSTANCE, LoggingIOSessionListener.INSTANCE, LoggingReactorMetricsListener.INSTANCE, - sessionShutdownCallback); + sessionShutdownCallback, + null); } public Future listen(final InetSocketAddress address) { diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java index 399ca923cf..9bfbdd8ba4 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java @@ -47,11 +47,12 @@ import org.apache.hc.core5.reactor.DefaultConnectingIOReactor; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOReactorService; import org.apache.hc.core5.reactor.IOReactorStatus; -import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; +import org.apache.hc.core5.reactor.IOWorkerSelector; import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.Timeout; @@ -75,8 +76,8 @@ public AsyncRequester( final IOSessionListener sessionListener, final Callback sessionShutdownCallback, final Resolver addressResolver, - final IOReactorMetricsListener threadPoolListener - ) { + final IOReactorMetricsListener threadPoolListener, + final IOWorkerSelector workerSelector) { this.ioReactor = new DefaultConnectingIOReactor( eventHandlerFactory, ioReactorConfig, @@ -85,7 +86,8 @@ public AsyncRequester( exceptionCallback, sessionListener, threadPoolListener, - sessionShutdownCallback); + sessionShutdownCallback, + workerSelector); this.addressResolver = addressResolver != null ? addressResolver : DefaultAddressResolver.INSTANCE; } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java index 2c16641a4d..3f8c82ac32 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java @@ -293,7 +293,8 @@ public HttpAsyncRequester create() { connPool, tlsStrategyCopy, handshakeTimeout, - threadPoolListener); + threadPoolListener, + null); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java index f4c34b45f6..2b6fc99059 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java @@ -43,11 +43,12 @@ import org.apache.hc.core5.reactor.DefaultListeningIOReactor; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOReactorService; import org.apache.hc.core5.reactor.IOReactorStatus; -import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; +import org.apache.hc.core5.reactor.IOWorkerSelector; import org.apache.hc.core5.reactor.ListenerEndpoint; import org.apache.hc.core5.util.TimeValue; @@ -66,7 +67,8 @@ public AsyncServer( final Callback exceptionCallback, final IOSessionListener sessionListener, final IOReactorMetricsListener threadPoolListener, - final Callback sessionShutdownCallback) { + final Callback sessionShutdownCallback, + final IOWorkerSelector workerSelector) { this.ioReactor = new DefaultListeningIOReactor( eventHandlerFactory, ioReactorConfig, @@ -76,7 +78,8 @@ public AsyncServer( exceptionCallback, sessionListener, threadPoolListener, - sessionShutdownCallback); + sessionShutdownCallback, + workerSelector); } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java index 10b821c59d..78f3df77c3 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java @@ -517,7 +517,7 @@ public HttpAsyncServer create() { tlsStrategy, handshakeTimeout); return new HttpAsyncServer(ioEventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, - sessionListener, threadPoolListener); + sessionListener, threadPoolListener, null, null); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java index 94ebfde1a3..b94f051a6d 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java @@ -81,6 +81,7 @@ import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; +import org.apache.hc.core5.reactor.IOWorkerSelector; import org.apache.hc.core5.reactor.ProtocolIOSession; import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer; import org.apache.hc.core5.util.Args; @@ -113,30 +114,16 @@ public HttpAsyncRequester( final ManagedConnPool connPool, final TlsStrategy tlsStrategy, final Timeout handshakeTimeout, - final IOReactorMetricsListener threadPoolListener) { + final IOReactorMetricsListener threadPoolListener, + final IOWorkerSelector workerSelector) { super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, - ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener); + ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener, + workerSelector); this.connPool = Args.notNull(connPool, "Connection pool"); this.tlsStrategy = tlsStrategy; this.handshakeTimeout = handshakeTimeout; } - /** - * Use {@link AsyncRequesterBootstrap} to create instances of this class. - */ - @Internal - public HttpAsyncRequester( - final IOReactorConfig ioReactorConfig, - final IOEventHandlerFactory eventHandlerFactory, - final Decorator ioSessionDecorator, - final Callback exceptionCallback, - final IOSessionListener sessionListener, - final ManagedConnPool connPool, - final IOReactorMetricsListener threadPoolListener) { - this(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool, - null, null, threadPoolListener); - } - @Override public PoolStats getTotalStats() { return connPool.getTotalStats(); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java index 40b19bb65b..04241385fd 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java @@ -35,13 +35,14 @@ import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.function.Decorator; import org.apache.hc.core5.http.URIScheme; -import org.apache.hc.core5.reactor.EndpointParameters; import org.apache.hc.core5.http.nio.command.ShutdownCommand; +import org.apache.hc.core5.reactor.EndpointParameters; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; +import org.apache.hc.core5.reactor.IOWorkerSelector; import org.apache.hc.core5.reactor.ListenerEndpoint; /** @@ -66,26 +67,13 @@ public HttpAsyncServer( final Callback exceptionCallback, final IOSessionListener sessionListener, final IOReactorMetricsListener threadPoolListener, + final IOWorkerSelector workerSelector, final String canonicalName) { super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, - threadPoolListener, ShutdownCommand.GRACEFUL_NORMAL_CALLBACK); + threadPoolListener, ShutdownCommand.GRACEFUL_NORMAL_CALLBACK, workerSelector); this.canonicalName = canonicalName; } - /** - * Use {@link AsyncServerBootstrap} to create instances of this class. - */ - @Internal - public HttpAsyncServer( - final IOEventHandlerFactory eventHandlerFactory, - final IOReactorConfig ioReactorConfig, - final Decorator ioSessionDecorator, - final Callback exceptionCallback, - final IOSessionListener sessionListener, - final IOReactorMetricsListener threadPoolListener) { - this(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, threadPoolListener, null); - } - /** * @since 5.1 */ diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactorBase.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactorBase.java index 706b6a3c70..e3914f80e8 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactorBase.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactorBase.java @@ -50,13 +50,17 @@ public final Future connect( throw new IOReactorShutdownException("I/O reactor has been shut down"); } try { - return getWorkerSelector().next().connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback); + final SingleCoreIOReactor dispatcher = selectWorker(); + if (dispatcher.getStatus() == IOReactorStatus.SHUT_DOWN) { + throw new IOReactorShutdownException("I/O reactor has been shut down"); + } + return dispatcher.connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback); } catch (final IOReactorShutdownException ex) { initiateShutdown(); throw ex; } } - abstract IOWorkers.Selector getWorkerSelector(); + abstract SingleCoreIOReactor selectWorker(); } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java index 4d89dc200b..fb1ae47c8d 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.concurrent.ThreadFactory; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.concurrent.DefaultThreadFactory; import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.function.Decorator; @@ -48,16 +49,16 @@ */ public class DefaultConnectingIOReactor extends AbstractIOReactorBase { - private final int workerCount; private final SingleCoreIOReactor[] workers; private final MultiCoreIOReactor ioReactor; - private final IOWorkers.Selector workerSelector; + private final IOWorkerSelector workerSelector; private final static ThreadFactory THREAD_FACTORY = new DefaultThreadFactory("I/O client dispatch", true); /** * @since 5.4 */ + @Internal public DefaultConnectingIOReactor( final IOEventHandlerFactory eventHandlerFactory, final IOReactorConfig ioReactorConfig, @@ -66,9 +67,10 @@ public DefaultConnectingIOReactor( final Callback exceptionCallback, final IOSessionListener sessionListener, final IOReactorMetricsListener threadPoolListener, - final Callback sessionShutdownCallback) { + final Callback sessionShutdownCallback, + final IOWorkerSelector workerSelector) { Args.notNull(eventHandlerFactory, "Event handler factory"); - this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount(); + final int workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount(); this.workers = new SingleCoreIOReactor[workerCount]; final Thread[] threads = new Thread[workerCount]; for (int i = 0; i < this.workers.length; i++) { @@ -84,7 +86,7 @@ public DefaultConnectingIOReactor( threads[i] = (threadFactory != null ? threadFactory : THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher)); } this.ioReactor = new MultiCoreIOReactor(this.workers, threads); - this.workerSelector = IOWorkers.newSelector(workers); + this.workerSelector = workerSelector != null ? workerSelector : IOWorkerSelectors.newSelector(workerCount); } public DefaultConnectingIOReactor( @@ -95,7 +97,8 @@ public DefaultConnectingIOReactor( final Callback exceptionCallback, final IOSessionListener sessionListener, final Callback sessionShutdownCallback) { - this(eventHandlerFactory,ioReactorConfig, threadFactory,ioSessionDecorator, exceptionCallback, sessionListener, null, sessionShutdownCallback); + this(eventHandlerFactory,ioReactorConfig, threadFactory,ioSessionDecorator, exceptionCallback, sessionListener, + null, sessionShutdownCallback, null); } public DefaultConnectingIOReactor( @@ -125,8 +128,8 @@ public IOReactorStatus getStatus() { } @Override - IOWorkers.Selector getWorkerSelector() { - return workerSelector; + SingleCoreIOReactor selectWorker() { + return workers[workerSelector.select(workers)]; } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java index feb6b662a7..ce99cd5924 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java @@ -33,6 +33,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.concurrent.DefaultThreadFactory; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.function.Callback; @@ -55,37 +56,15 @@ public class DefaultListeningIOReactor extends AbstractIOReactorBase implements private final static ThreadFactory DISPATCH_THREAD_FACTORY = new DefaultThreadFactory("I/O server dispatch", true); private final static ThreadFactory LISTENER_THREAD_FACTORY = new DefaultThreadFactory("I/O listener", true); - private final int workerCount; private final SingleCoreIOReactor[] workers; private final SingleCoreListeningIOReactor listener; private final MultiCoreIOReactor ioReactor; - private final IOWorkers.Selector workerSelector; - - /** - * Creates an instance of DefaultListeningIOReactor with the given configuration. - * - * @param eventHandlerFactory the factory to create I/O event handlers. - * @param ioReactorConfig I/O reactor configuration. - * @param listenerThreadFactory the factory to create listener thread. - * Can be {@code null}. - * - * @since 5.0 - */ - public DefaultListeningIOReactor( - final IOEventHandlerFactory eventHandlerFactory, - final IOReactorConfig ioReactorConfig, - final ThreadFactory dispatchThreadFactory, - final ThreadFactory listenerThreadFactory, - final Decorator ioSessionDecorator, - final Callback exceptionCallback, - final IOSessionListener sessionListener, - final Callback sessionShutdownCallback) { - this(eventHandlerFactory, ioReactorConfig, dispatchThreadFactory, listenerThreadFactory, ioSessionDecorator, exceptionCallback, sessionListener, null, sessionShutdownCallback); - } + private final IOWorkerSelector workerSelector; /** * @since 5.4 */ + @Internal public DefaultListeningIOReactor( final IOEventHandlerFactory eventHandlerFactory, final IOReactorConfig ioReactorConfig, @@ -95,9 +74,10 @@ public DefaultListeningIOReactor( final Callback exceptionCallback, final IOSessionListener sessionListener, final IOReactorMetricsListener threadPoolListener, - final Callback sessionShutdownCallback) { + final Callback sessionShutdownCallback, + final IOWorkerSelector workerSelector) { Args.notNull(eventHandlerFactory, "Event handler factory"); - this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount(); + final int workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount(); this.workers = new SingleCoreIOReactor[workerCount]; final Thread[] threads = new Thread[workerCount + 1]; for (int i = 0; i < this.workers.length; i++) { @@ -112,15 +92,36 @@ public DefaultListeningIOReactor( this.workers[i] = dispatcher; threads[i + 1] = (dispatchThreadFactory != null ? dispatchThreadFactory : DISPATCH_THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher)); } - final IOReactor[] ioReactors = new IOReactor[this.workerCount + 1]; - System.arraycopy(this.workers, 0, ioReactors, 1, this.workerCount); + final IOReactor[] ioReactors = new IOReactor[workerCount + 1]; + System.arraycopy(this.workers, 0, ioReactors, 1, workerCount); this.listener = new SingleCoreListeningIOReactor(exceptionCallback, ioReactorConfig, this::enqueueChannel); ioReactors[0] = this.listener; threads[0] = (listenerThreadFactory != null ? listenerThreadFactory : LISTENER_THREAD_FACTORY).newThread(new IOReactorWorker(listener)); - this.ioReactor = new MultiCoreIOReactor(ioReactors, threads); + this.workerSelector = workerSelector != null ? workerSelector : IOWorkerSelectors.newSelector(workerCount); + } - workerSelector = IOWorkers.newSelector(workers); + /** + * Creates an instance of DefaultListeningIOReactor with the given configuration. + * + * @param eventHandlerFactory the factory to create I/O event handlers. + * @param ioReactorConfig I/O reactor configuration. + * @param listenerThreadFactory the factory to create listener thread. + * Can be {@code null}. + * + * @since 5.0 + */ + public DefaultListeningIOReactor( + final IOEventHandlerFactory eventHandlerFactory, + final IOReactorConfig ioReactorConfig, + final ThreadFactory dispatchThreadFactory, + final ThreadFactory listenerThreadFactory, + final Decorator ioSessionDecorator, + final Callback exceptionCallback, + final IOSessionListener sessionListener, + final Callback sessionShutdownCallback) { + this(eventHandlerFactory, ioReactorConfig, dispatchThreadFactory, listenerThreadFactory, ioSessionDecorator, + exceptionCallback, sessionListener, null, sessionShutdownCallback, null); } /** @@ -191,19 +192,18 @@ public IOReactorStatus getStatus() { } @Override - IOWorkers.Selector getWorkerSelector() { - return workerSelector; + SingleCoreIOReactor selectWorker() { + return workers[workerSelector.select(workers)]; } private void enqueueChannel(final ChannelEntry entry) { try { - workerSelector.next().enqueueChannel(entry); + selectWorker().enqueueChannel(entry); } catch (final IOReactorShutdownException ex) { initiateShutdown(); } } - @Override public void initiateShutdown() { ioReactor.initiateShutdown(); diff --git a/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkersTest.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerSelector.java similarity index 69% rename from httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkersTest.java rename to httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerSelector.java index c087af4320..456af59299 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkersTest.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerSelector.java @@ -24,21 +24,15 @@ * . * */ -package org.apache.hc.core5.reactor; -import static org.mockito.Mockito.mock; +package org.apache.hc.core5.reactor; -import org.junit.jupiter.api.Test; +import org.apache.hc.core5.annotation.Internal; -class IOWorkersTest { +@Internal +@FunctionalInterface +public interface IOWorkerSelector { - @Test - void testIndexOverflow() { - final SingleCoreIOReactor reactor = new SingleCoreIOReactor(null, mock(IOEventHandlerFactory.class), IOReactorConfig.DEFAULT, null, null, null, null); - final IOWorkers.Selector selector = IOWorkers.newSelector(new SingleCoreIOReactor[]{reactor, reactor, reactor}); - for (long i = Integer.MAX_VALUE - 10; i < (long) Integer.MAX_VALUE + 10; i++) { - selector.next(); - } - } + int select(IOWorkerStats[] dispatchers); } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerSelectors.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerSelectors.java new file mode 100644 index 0000000000..9170db5e9a --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerSelectors.java @@ -0,0 +1,75 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.reactor; + +import java.util.concurrent.atomic.AtomicInteger; + +final class IOWorkerSelectors { + + static IOWorkerSelector newSelector(final int workerCount, final int start) { + return isPowerOfTwo(workerCount) ? new PowerOfTwoSelector(start) : new GenericSelector(start); + } + + static IOWorkerSelector newSelector(final int workerCount) { + return newSelector(workerCount, 0); + } + + static boolean isPowerOfTwo(final int n) { + return (n & -n) == n; + } + + static final class PowerOfTwoSelector implements IOWorkerSelector { + + private final AtomicInteger idx; + + PowerOfTwoSelector(final int n) { + this.idx = new AtomicInteger(n); + } + + @Override + public int select(final IOWorkerStats[] dispatchers) { + return idx.getAndIncrement() & (dispatchers.length - 1); + } + + } + + static final class GenericSelector implements IOWorkerSelector { + + private final AtomicInteger idx; + + GenericSelector(final int n) { + this.idx = new AtomicInteger(n); + } + + @Override + public int select(final IOWorkerStats[] dispatchers) { + return (idx.getAndIncrement() & Integer.MAX_VALUE) % dispatchers.length; + } + + } + +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerStats.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerStats.java new file mode 100644 index 0000000000..75ec5fea72 --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerStats.java @@ -0,0 +1,50 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.reactor; + +import org.apache.hc.core5.annotation.Internal; + +/** + * Internal I/O dispatch stats that can be used by {@link IOWorkerSelector} + * to select the best suited worker to get new I/O channels. + * + * @since 5.4 + */ +@Internal +public interface IOWorkerStats { + + // Relatively expensive + int totalChannelCount(); + + // Relatively expensive + int pendingChannelCount(); + + // Cheap + long lastSelectMilli(); + +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkers.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkers.java deleted file mode 100644 index c7cadac330..0000000000 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkers.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * ==================================================================== - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * ==================================================================== - * - * This software consists of voluntary contributions made by many - * individuals on behalf of the Apache Software Foundation. For more - * information on the Apache Software Foundation, please see - * . - * - */ -package org.apache.hc.core5.reactor; - -import java.util.concurrent.atomic.AtomicInteger; - -final class IOWorkers { - - interface Selector { - - SingleCoreIOReactor next(); - - } - - static Selector newSelector(final SingleCoreIOReactor[] dispatchers) { - return isPowerOfTwo(dispatchers.length) - ? new PowerOfTwoSelector(dispatchers) - : new GenericSelector(dispatchers); - } - - private static boolean isPowerOfTwo(final int val) { - return (val & -val) == val; - } - - private static void validate(final SingleCoreIOReactor dispatcher) { - if (dispatcher.getStatus() == IOReactorStatus.SHUT_DOWN) { - throw new IOReactorShutdownException("I/O reactor has been shut down"); - } - } - - private static final class PowerOfTwoSelector implements Selector { - - private final AtomicInteger idx = new AtomicInteger(0); - private final SingleCoreIOReactor[] dispatchers; - - PowerOfTwoSelector(final SingleCoreIOReactor[] dispatchers) { - this.dispatchers = dispatchers; - } - - @Override - public SingleCoreIOReactor next() { - final SingleCoreIOReactor dispatcher = dispatchers[idx.getAndIncrement() & (dispatchers.length - 1)]; - validate(dispatcher); - return dispatcher; - } - } - - private static final class GenericSelector implements Selector { - - private final AtomicInteger idx = new AtomicInteger(0); - private final SingleCoreIOReactor[] dispatchers; - - GenericSelector(final SingleCoreIOReactor[] dispatchers) { - this.dispatchers = dispatchers; - } - - @Override - public SingleCoreIOReactor next() { - final SingleCoreIOReactor dispatcher = dispatchers[(idx.getAndIncrement() & Integer.MAX_VALUE) % dispatchers.length]; - validate(dispatcher); - return dispatcher; - } - } - -} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java index 2d4f4f9035..7091e724d3 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java @@ -55,7 +55,7 @@ import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.Timeout; -class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator { +class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator, IOWorkerStats { private static final int MAX_CHANNEL_REQUESTS = 10000; @@ -70,6 +70,7 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect private final AtomicBoolean shutdownInitiated; private final long selectTimeoutMillis; private volatile long lastTimeoutCheckMillis; + private volatile long lastSelectMillis; private final IOReactorMetricsListener threadPoolListener; // Atomic variables for tracking total wait time and count of processed requests @@ -130,6 +131,7 @@ void doExecute() throws IOException { } // Process selected I/O events + lastSelectMillis = System.currentTimeMillis(); if (readyCount > 0) { processEvents(this.selector.selectedKeys()); } @@ -465,4 +467,19 @@ private void reportStatusToThreadPoolListener() { } } + @Override + public int totalChannelCount() { + return selector.keys().size(); + } + + @Override + public int pendingChannelCount() { + return channelQueue.size() + requestQueue.size(); + } + + @Override + public long lastSelectMilli() { + return lastSelectMillis; + } + } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkerSelectorsTest.java b/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkerSelectorsTest.java new file mode 100644 index 0000000000..3910445627 --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkerSelectorsTest.java @@ -0,0 +1,51 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.reactor; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mockito; + +class IOWorkerSelectorsTest { + + @ParameterizedTest(name = "worker count = {0}") + @ValueSource(ints = {1, 2, 3, 4, 5, 10, 15, 16, 32}) + void testIndexOverflow(final int workerCount) { + final long start = (long) Integer.MAX_VALUE - 10; + final long end = (long) Integer.MAX_VALUE + 10; + final IOWorkerStats[] workers = new IOWorkerStats[workerCount]; + for (int i = 0; i < workerCount; i++) { + workers[i] = Mockito.mock(IOWorkerStats.class); + } + final IOWorkerSelector selector = IOWorkerSelectors.newSelector(workerCount, (int) start); + for (long i = start; i < end; i++) { + Assertions.assertTrue(selector.select(workers) < workerCount); + } + } + +}