Skip to content

Commit

Permalink
HTTPCORE-773: introduced customizable strategy to select i/o dispatch…
Browse files Browse the repository at this point in the history
… worker for new i/o channels
  • Loading branch information
ok2c committed Nov 30, 2024
1 parent 8ee6483 commit e66c4f5
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ DefaultConnectingIOReactor createIOReactor(
LoggingExceptionCallback.INSTANCE,
LoggingIOSessionListener.INSTANCE,
LoggingReactorMetricsListener.INSTANCE,
sessionShutdownCallback);
sessionShutdownCallback,
null);
}

private InetSocketAddress toSocketAddress(final HttpHost host) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ DefaultListeningIOReactor createIOReactor(
LoggingExceptionCallback.INSTANCE,
LoggingIOSessionListener.INSTANCE,
LoggingReactorMetricsListener.INSTANCE,
sessionShutdownCallback);
sessionShutdownCallback,
null);
}

public Future<ListenerEndpoint> listen(final InetSocketAddress address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ public AsyncRequester(
final IOSessionListener sessionListener,
final Callback<IOSession> sessionShutdownCallback,
final Resolver<HttpHost, InetSocketAddress> addressResolver,
final IOReactorMetricsListener threadPoolListener
) {
final IOReactorMetricsListener threadPoolListener) {
this.ioReactor = new DefaultConnectingIOReactor(
eventHandlerFactory,
ioReactorConfig,
Expand All @@ -85,7 +84,8 @@ public AsyncRequester(
exceptionCallback,
sessionListener,
threadPoolListener,
sessionShutdownCallback);
sessionShutdownCallback,
null);
this.addressResolver = addressResolver != null ? addressResolver : DefaultAddressResolver.INSTANCE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public AsyncServer(
exceptionCallback,
sessionListener,
threadPoolListener,
sessionShutdownCallback);
sessionShutdownCallback,
null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ public final Future<IOSession> connect(
throw new IOReactorShutdownException("I/O reactor has been shut down");
}
try {
return getWorkerSelector().next().connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback);
final SingleCoreIOReactor dispatcher = selectWorker();
if (dispatcher.getStatus() == IOReactorStatus.SHUT_DOWN) {
throw new IOReactorShutdownException("I/O reactor has been shut down");
}
return dispatcher.connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback);
} catch (final IOReactorShutdownException ex) {
initiateShutdown();
throw ex;
}
}

abstract IOWorkers.Selector getWorkerSelector();
abstract SingleCoreIOReactor selectWorker();

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@

import java.io.IOException;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;

import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
Expand All @@ -48,16 +50,16 @@
*/
public class DefaultConnectingIOReactor extends AbstractIOReactorBase {

private final int workerCount;
private final SingleCoreIOReactor[] workers;
private final MultiCoreIOReactor ioReactor;
private final IOWorkers.Selector workerSelector;
private final Function<IOWorkerStats[], Integer> workerSelector;

private final static ThreadFactory THREAD_FACTORY = new DefaultThreadFactory("I/O client dispatch", true);

/**
* @since 5.4
*/
@Internal
public DefaultConnectingIOReactor(
final IOEventHandlerFactory eventHandlerFactory,
final IOReactorConfig ioReactorConfig,
Expand All @@ -66,9 +68,10 @@ public DefaultConnectingIOReactor(
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final IOReactorMetricsListener threadPoolListener,
final Callback<IOSession> sessionShutdownCallback) {
final Callback<IOSession> sessionShutdownCallback,
final Function<IOWorkerStats[], Integer> workerSelector) {
Args.notNull(eventHandlerFactory, "Event handler factory");
this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
final int workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
this.workers = new SingleCoreIOReactor[workerCount];
final Thread[] threads = new Thread[workerCount];
for (int i = 0; i < this.workers.length; i++) {
Expand All @@ -84,7 +87,7 @@ public DefaultConnectingIOReactor(
threads[i] = (threadFactory != null ? threadFactory : THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher));
}
this.ioReactor = new MultiCoreIOReactor(this.workers, threads);
this.workerSelector = IOWorkers.newSelector(workers);
this.workerSelector = workerSelector != null ? workerSelector : IOWorkerSelectors.newSelector(workerCount);
}

public DefaultConnectingIOReactor(
Expand All @@ -95,7 +98,8 @@ public DefaultConnectingIOReactor(
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final Callback<IOSession> sessionShutdownCallback) {
this(eventHandlerFactory,ioReactorConfig, threadFactory,ioSessionDecorator, exceptionCallback, sessionListener, null, sessionShutdownCallback);
this(eventHandlerFactory,ioReactorConfig, threadFactory,ioSessionDecorator, exceptionCallback, sessionListener,
null, sessionShutdownCallback, null);
}

public DefaultConnectingIOReactor(
Expand Down Expand Up @@ -125,8 +129,8 @@ public IOReactorStatus getStatus() {
}

@Override
IOWorkers.Selector getWorkerSelector() {
return workerSelector;
SingleCoreIOReactor selectWorker() {
return workers[workerSelector.apply(workers)];
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;

import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
Expand All @@ -55,37 +57,15 @@ public class DefaultListeningIOReactor extends AbstractIOReactorBase implements
private final static ThreadFactory DISPATCH_THREAD_FACTORY = new DefaultThreadFactory("I/O server dispatch", true);
private final static ThreadFactory LISTENER_THREAD_FACTORY = new DefaultThreadFactory("I/O listener", true);

private final int workerCount;
private final SingleCoreIOReactor[] workers;
private final SingleCoreListeningIOReactor listener;
private final MultiCoreIOReactor ioReactor;
private final IOWorkers.Selector workerSelector;

/**
* Creates an instance of DefaultListeningIOReactor with the given configuration.
*
* @param eventHandlerFactory the factory to create I/O event handlers.
* @param ioReactorConfig I/O reactor configuration.
* @param listenerThreadFactory the factory to create listener thread.
* Can be {@code null}.
*
* @since 5.0
*/
public DefaultListeningIOReactor(
final IOEventHandlerFactory eventHandlerFactory,
final IOReactorConfig ioReactorConfig,
final ThreadFactory dispatchThreadFactory,
final ThreadFactory listenerThreadFactory,
final Decorator<IOSession> ioSessionDecorator,
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final Callback<IOSession> sessionShutdownCallback) {
this(eventHandlerFactory, ioReactorConfig, dispatchThreadFactory, listenerThreadFactory, ioSessionDecorator, exceptionCallback, sessionListener, null, sessionShutdownCallback);
}
private final Function<IOWorkerStats[], Integer> workerSelector;

/**
* @since 5.4
*/
@Internal
public DefaultListeningIOReactor(
final IOEventHandlerFactory eventHandlerFactory,
final IOReactorConfig ioReactorConfig,
Expand All @@ -95,9 +75,10 @@ public DefaultListeningIOReactor(
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final IOReactorMetricsListener threadPoolListener,
final Callback<IOSession> sessionShutdownCallback) {
final Callback<IOSession> sessionShutdownCallback,
final Function<IOWorkerStats[], Integer> workerSelector) {
Args.notNull(eventHandlerFactory, "Event handler factory");
this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
final int workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount();
this.workers = new SingleCoreIOReactor[workerCount];
final Thread[] threads = new Thread[workerCount + 1];
for (int i = 0; i < this.workers.length; i++) {
Expand All @@ -112,15 +93,36 @@ public DefaultListeningIOReactor(
this.workers[i] = dispatcher;
threads[i + 1] = (dispatchThreadFactory != null ? dispatchThreadFactory : DISPATCH_THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher));
}
final IOReactor[] ioReactors = new IOReactor[this.workerCount + 1];
System.arraycopy(this.workers, 0, ioReactors, 1, this.workerCount);
final IOReactor[] ioReactors = new IOReactor[workerCount + 1];
System.arraycopy(this.workers, 0, ioReactors, 1, workerCount);
this.listener = new SingleCoreListeningIOReactor(exceptionCallback, ioReactorConfig, this::enqueueChannel);
ioReactors[0] = this.listener;
threads[0] = (listenerThreadFactory != null ? listenerThreadFactory : LISTENER_THREAD_FACTORY).newThread(new IOReactorWorker(listener));

this.ioReactor = new MultiCoreIOReactor(ioReactors, threads);
this.workerSelector = workerSelector != null ? workerSelector : IOWorkerSelectors.newSelector(workerCount);
}

workerSelector = IOWorkers.newSelector(workers);
/**
* Creates an instance of DefaultListeningIOReactor with the given configuration.
*
* @param eventHandlerFactory the factory to create I/O event handlers.
* @param ioReactorConfig I/O reactor configuration.
* @param listenerThreadFactory the factory to create listener thread.
* Can be {@code null}.
*
* @since 5.0
*/
public DefaultListeningIOReactor(
final IOEventHandlerFactory eventHandlerFactory,
final IOReactorConfig ioReactorConfig,
final ThreadFactory dispatchThreadFactory,
final ThreadFactory listenerThreadFactory,
final Decorator<IOSession> ioSessionDecorator,
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final Callback<IOSession> sessionShutdownCallback) {
this(eventHandlerFactory, ioReactorConfig, dispatchThreadFactory, listenerThreadFactory, ioSessionDecorator,
exceptionCallback, sessionListener, null, sessionShutdownCallback, null);
}

/**
Expand Down Expand Up @@ -191,19 +193,18 @@ public IOReactorStatus getStatus() {
}

@Override
IOWorkers.Selector getWorkerSelector() {
return workerSelector;
SingleCoreIOReactor selectWorker() {
return workers[workerSelector.apply(workers)];
}

private void enqueueChannel(final ChannelEntry entry) {
try {
workerSelector.next().enqueueChannel(entry);
selectWorker().enqueueChannel(entry);
} catch (final IOReactorShutdownException ex) {
initiateShutdown();
}
}


@Override
public void initiateShutdown() {
ioReactor.initiateShutdown();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.reactor;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

final class IOWorkerSelectors {

static Function<IOWorkerStats[], Integer> newSelector(final int workerCount, final int start) {
return isPowerOfTwo(workerCount) ? new PowerOfTwoSelector(start) : new GenericSelector(start);
}

static Function<IOWorkerStats[], Integer> newSelector(final int workerCount) {
return newSelector(workerCount, 0);
}

static boolean isPowerOfTwo(final int n) {
return (n & -n) == n;
}

static final class PowerOfTwoSelector implements Function<IOWorkerStats[], Integer> {

private final AtomicInteger idx;

PowerOfTwoSelector(final int n) {
this.idx = new AtomicInteger(n);
}

@Override
public Integer apply(final IOWorkerStats[] dispatchers) {
return idx.getAndIncrement() & (dispatchers.length - 1);
}

}

static final class GenericSelector implements Function<IOWorkerStats[], Integer> {

private final AtomicInteger idx;

GenericSelector(final int n) {
this.idx = new AtomicInteger(n);
}

@Override
public Integer apply(final IOWorkerStats[] dispatchers) {
return (idx.getAndIncrement() & Integer.MAX_VALUE) % dispatchers.length;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,16 @@
* <http://www.apache.org/>.
*
*/

package org.apache.hc.core5.reactor;

import static org.mockito.Mockito.mock;
import org.apache.hc.core5.annotation.Internal;

import org.junit.jupiter.api.Test;
@Internal
public interface IOWorkerStats {

class IOWorkersTest {
int totalChannelCount();

@Test
void testIndexOverflow() {
final SingleCoreIOReactor reactor = new SingleCoreIOReactor(null, mock(IOEventHandlerFactory.class), IOReactorConfig.DEFAULT, null, null, null, null);
final IOWorkers.Selector selector = IOWorkers.newSelector(new SingleCoreIOReactor[]{reactor, reactor, reactor});
for (long i = Integer.MAX_VALUE - 10; i < (long) Integer.MAX_VALUE + 10; i++) {
selector.next();
}
}
int pendingChannelCount();

}
Loading

0 comments on commit e66c4f5

Please sign in to comment.