Skip to content

Commit

Permalink
HTTPCORE-773: introduced customizable strategy to select i/o dispatch…
Browse files Browse the repository at this point in the history
… worker for new i/o channels
  • Loading branch information
ok2c committed Dec 1, 2024
1 parent 8ee6483 commit 7ecf575
Show file tree
Hide file tree
Showing 22 changed files with 293 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hc.core5.reactor.IOReactorMetricsListener;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
import org.apache.hc.core5.reactor.IOWorkerSelector;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.util.Timeout;
Expand All @@ -65,25 +66,6 @@ public class H2AsyncRequester extends HttpAsyncRequester {
* Use {@link H2RequesterBootstrap} to create instances of this class.
*/
@Internal
public H2AsyncRequester(
final HttpVersionPolicy versionPolicy,
final IOReactorConfig ioReactorConfig,
final IOEventHandlerFactory eventHandlerFactory,
final Decorator<IOSession> ioSessionDecorator,
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final ManagedConnPool<HttpHost, IOSession> connPool,
final IOReactorMetricsListener threadPoolListener) {
super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool, threadPoolListener);
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
}

/**
* Use {@link H2RequesterBootstrap} to create instances of this class.
*
* @since 5.2
*/
@Internal
public H2AsyncRequester(
final HttpVersionPolicy versionPolicy,
final IOReactorConfig ioReactorConfig,
Expand All @@ -94,9 +76,10 @@ public H2AsyncRequester(
final ManagedConnPool<HttpHost, IOSession> connPool,
final TlsStrategy tlsStrategy,
final Timeout handshakeTimeout,
final IOReactorMetricsListener threadPoolListener) {
final IOReactorMetricsListener threadPoolListener,
final IOWorkerSelector workerSelector) {
super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
tlsStrategy, handshakeTimeout, threadPoolListener);
tlsStrategy, handshakeTimeout, threadPoolListener, workerSelector);
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.hc.core5.reactor.IOReactorMetricsListener;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
import org.apache.hc.core5.reactor.IOWorkerSelector;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
Expand All @@ -98,9 +99,11 @@ public H2MultiplexingRequester(
final IOSessionListener sessionListener,
final Resolver<HttpHost, InetSocketAddress> addressResolver,
final TlsStrategy tlsStrategy,
final IOReactorMetricsListener threadPoolListener) {
final IOReactorMetricsListener threadPoolListener,
final IOWorkerSelector workerSelector) {
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener);
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE,
threadPoolListener, workerSelector);
this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ public H2MultiplexingRequester create() {
sessionListener,
DefaultAddressResolver.INSTANCE,
tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(),
threadPoolListener);
threadPoolListener,
null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ public H2AsyncRequester create() {
connPool,
actualTlsStrategy,
handshakeTimeout,
threadPoolListener);
threadPoolListener,
null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ public HttpAsyncServer create() {
handshakeTimeout);

return new HttpAsyncServer(ioEventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback,
sessionListener, threadPoolListener, actualCanonicalHostName);
sessionListener, threadPoolListener, null, actualCanonicalHostName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ DefaultConnectingIOReactor createIOReactor(
LoggingExceptionCallback.INSTANCE,
LoggingIOSessionListener.INSTANCE,
LoggingReactorMetricsListener.INSTANCE,
sessionShutdownCallback);
sessionShutdownCallback,
null);
}

private InetSocketAddress toSocketAddress(final HttpHost host) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ DefaultListeningIOReactor createIOReactor(
LoggingExceptionCallback.INSTANCE,
LoggingIOSessionListener.INSTANCE,
LoggingReactorMetricsListener.INSTANCE,
sessionShutdownCallback);
sessionShutdownCallback,
null);
}

public Future<ListenerEndpoint> listen(final InetSocketAddress address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorMetricsListener;
import org.apache.hc.core5.reactor.IOReactorService;
import org.apache.hc.core5.reactor.IOReactorStatus;
import org.apache.hc.core5.reactor.IOReactorMetricsListener;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
import org.apache.hc.core5.reactor.IOWorkerSelector;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
Expand All @@ -75,8 +76,8 @@ public AsyncRequester(
final IOSessionListener sessionListener,
final Callback<IOSession> sessionShutdownCallback,
final Resolver<HttpHost, InetSocketAddress> addressResolver,
final IOReactorMetricsListener threadPoolListener
) {
final IOReactorMetricsListener threadPoolListener,
final IOWorkerSelector workerSelector) {
this.ioReactor = new DefaultConnectingIOReactor(
eventHandlerFactory,
ioReactorConfig,
Expand All @@ -85,7 +86,8 @@ public AsyncRequester(
exceptionCallback,
sessionListener,
threadPoolListener,
sessionShutdownCallback);
sessionShutdownCallback,
workerSelector);
this.addressResolver = addressResolver != null ? addressResolver : DefaultAddressResolver.INSTANCE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ public HttpAsyncRequester create() {
connPool,
tlsStrategyCopy,
handshakeTimeout,
threadPoolListener);
threadPoolListener,
null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@
import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorMetricsListener;
import org.apache.hc.core5.reactor.IOReactorService;
import org.apache.hc.core5.reactor.IOReactorStatus;
import org.apache.hc.core5.reactor.IOReactorMetricsListener;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
import org.apache.hc.core5.reactor.IOWorkerSelector;
import org.apache.hc.core5.reactor.ListenerEndpoint;
import org.apache.hc.core5.util.TimeValue;

Expand All @@ -66,7 +67,8 @@ public AsyncServer(
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final IOReactorMetricsListener threadPoolListener,
final Callback<IOSession> sessionShutdownCallback) {
final Callback<IOSession> sessionShutdownCallback,
final IOWorkerSelector workerSelector) {
this.ioReactor = new DefaultListeningIOReactor(
eventHandlerFactory,
ioReactorConfig,
Expand All @@ -76,7 +78,8 @@ public AsyncServer(
exceptionCallback,
sessionListener,
threadPoolListener,
sessionShutdownCallback);
sessionShutdownCallback,
workerSelector);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ public HttpAsyncServer create() {
tlsStrategy,
handshakeTimeout);
return new HttpAsyncServer(ioEventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback,
sessionListener, threadPoolListener);
sessionListener, threadPoolListener, null, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.hc.core5.reactor.IOReactorMetricsListener;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
import org.apache.hc.core5.reactor.IOWorkerSelector;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
import org.apache.hc.core5.util.Args;
Expand Down Expand Up @@ -113,30 +114,16 @@ public HttpAsyncRequester(
final ManagedConnPool<HttpHost, IOSession> connPool,
final TlsStrategy tlsStrategy,
final Timeout handshakeTimeout,
final IOReactorMetricsListener threadPoolListener) {
final IOReactorMetricsListener threadPoolListener,
final IOWorkerSelector workerSelector) {
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener);
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener,
workerSelector);
this.connPool = Args.notNull(connPool, "Connection pool");
this.tlsStrategy = tlsStrategy;
this.handshakeTimeout = handshakeTimeout;
}

/**
* Use {@link AsyncRequesterBootstrap} to create instances of this class.
*/
@Internal
public HttpAsyncRequester(
final IOReactorConfig ioReactorConfig,
final IOEventHandlerFactory eventHandlerFactory,
final Decorator<IOSession> ioSessionDecorator,
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final ManagedConnPool<HttpHost, IOSession> connPool,
final IOReactorMetricsListener threadPoolListener) {
this(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
null, null, threadPoolListener);
}

@Override
public PoolStats getTotalStats() {
return connPool.getTotalStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.reactor.EndpointParameters;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.reactor.EndpointParameters;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorMetricsListener;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
import org.apache.hc.core5.reactor.IOWorkerSelector;
import org.apache.hc.core5.reactor.ListenerEndpoint;

/**
Expand All @@ -66,26 +67,13 @@ public HttpAsyncServer(
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final IOReactorMetricsListener threadPoolListener,
final IOWorkerSelector workerSelector,
final String canonicalName) {
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
threadPoolListener, ShutdownCommand.GRACEFUL_NORMAL_CALLBACK);
threadPoolListener, ShutdownCommand.GRACEFUL_NORMAL_CALLBACK, workerSelector);
this.canonicalName = canonicalName;
}

/**
* Use {@link AsyncServerBootstrap} to create instances of this class.
*/
@Internal
public HttpAsyncServer(
final IOEventHandlerFactory eventHandlerFactory,
final IOReactorConfig ioReactorConfig,
final Decorator<IOSession> ioSessionDecorator,
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final IOReactorMetricsListener threadPoolListener) {
this(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, threadPoolListener, null);
}

/**
* @since 5.1
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ public final Future<IOSession> connect(
throw new IOReactorShutdownException("I/O reactor has been shut down");
}
try {
return getWorkerSelector().next().connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback);
final SingleCoreIOReactor dispatcher = selectWorker();
if (dispatcher.getStatus() == IOReactorStatus.SHUT_DOWN) {
throw new IOReactorShutdownException("I/O reactor has been shut down");
}
return dispatcher.connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback);
} catch (final IOReactorShutdownException ex) {
initiateShutdown();
throw ex;
}
}

abstract IOWorkers.Selector getWorkerSelector();
abstract SingleCoreIOReactor selectWorker();

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.util.concurrent.ThreadFactory;

import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
Expand All @@ -48,16 +49,16 @@
*/
public class DefaultConnectingIOReactor extends AbstractIOReactorBase {

private final int workerCount;
private final SingleCoreIOReactor[] workers;
private final MultiCoreIOReactor ioReactor;
private final IOWorkers.Selector workerSelector;
private final IOWorkerSelector workerSelector;

private final static ThreadFactory THREAD_FACTORY = new DefaultThreadFactory("I/O client dispatch", true);

/**
* @since 5.4
*/
@Internal
public DefaultConnectingIOReactor(
final IOEventHandlerFactory eventHandlerFactory,
final IOReactorConfig ioReactorConfig,
Expand All @@ -66,9 +67,10 @@ public DefaultConnectingIOReactor(
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final IOReactorMetricsListener threadPoolListener,
final Callback<IOSession> sessionShutdownCallback) {
final Callback<IOSession> sessionShutdownCallback,
final IOWorkerSelector workerSelector) {
Args.notNull(eventHandlerFactory, "Event handler factory");
this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
final int workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
this.workers = new SingleCoreIOReactor[workerCount];
final Thread[] threads = new Thread[workerCount];
for (int i = 0; i < this.workers.length; i++) {
Expand All @@ -84,7 +86,7 @@ public DefaultConnectingIOReactor(
threads[i] = (threadFactory != null ? threadFactory : THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher));
}
this.ioReactor = new MultiCoreIOReactor(this.workers, threads);
this.workerSelector = IOWorkers.newSelector(workers);
this.workerSelector = workerSelector != null ? workerSelector : IOWorkerSelectors.newSelector(workerCount);
}

public DefaultConnectingIOReactor(
Expand All @@ -95,7 +97,8 @@ public DefaultConnectingIOReactor(
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final Callback<IOSession> sessionShutdownCallback) {
this(eventHandlerFactory,ioReactorConfig, threadFactory,ioSessionDecorator, exceptionCallback, sessionListener, null, sessionShutdownCallback);
this(eventHandlerFactory,ioReactorConfig, threadFactory,ioSessionDecorator, exceptionCallback, sessionListener,
null, sessionShutdownCallback, null);
}

public DefaultConnectingIOReactor(
Expand Down Expand Up @@ -125,8 +128,8 @@ public IOReactorStatus getStatus() {
}

@Override
IOWorkers.Selector getWorkerSelector() {
return workerSelector;
SingleCoreIOReactor selectWorker() {
return workers[workerSelector.select(workers)];
}

@Override
Expand Down
Loading

0 comments on commit 7ecf575

Please sign in to comment.