Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote: Limit max number of gRPC connections by --remote_max_connections. #14202

Closed
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@ public ReferenceCounted touch(Object o) {
private final AtomicReference<String> authorityRef = new AtomicReference<>();

public ReferenceCountedChannel(ChannelConnectionFactory connectionFactory) {
this(connectionFactory, /*maxConnections=*/ 0);
}

public ReferenceCountedChannel(ChannelConnectionFactory connectionFactory, int maxConnections) {
this.dynamicConnectionPool =
new DynamicConnectionPool(connectionFactory, connectionFactory.maxConcurrency());
new DynamicConnectionPool(connectionFactory, connectionFactory.maxConcurrency(), maxConnections);
}

public boolean isShutdown() {
Expand All @@ -87,12 +91,12 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
super.onClose(status, trailers);

try {
connection.close();
} catch (IOException e) {
throw new AssertionError(e.getMessage(), e);
} finally {
super.onClose(status, trailers);
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,17 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
// based on the resolved IPs of that server. We assume servers normally have 2 IPs. So the
// max concurrency per connection is 100.
int maxConcurrencyPerConnection = 100;
int maxConnections = 0;
if (remoteOptions.incompatibleRemoteMaxConnectionsGrpc
&& remoteOptions.remoteMaxConnections > 0) {
maxConnections =
(int)
Math.ceil(
(double) remoteOptions.remoteMaxConnections
/ (double) maxConcurrencyPerConnection);
maxConcurrencyPerConnection =
(int) Math.ceil((double) remoteOptions.remoteMaxConnections / (double) maxConnections);
}

if (enableRemoteExecution) {
ImmutableList.Builder<ClientInterceptor> interceptors = ImmutableList.builder();
Expand All @@ -367,7 +378,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.build(),
maxConcurrencyPerConnection));
maxConcurrencyPerConnection),
maxConnections);

// Create a separate channel if --remote_executor and --remote_cache point to different
// endpoints.
Expand All @@ -390,7 +402,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.build(),
maxConcurrencyPerConnection));
maxConcurrencyPerConnection),
maxConnections);
}

if (enableRemoteDownloader) {
Expand All @@ -411,7 +424,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.build(),
maxConcurrencyPerConnection));
maxConcurrencyPerConnection),
maxConnections);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class DynamicConnectionPool implements ConnectionPool {
private final ConnectionFactory connectionFactory;
private final int maxConcurrencyPerConnection;
private final int maxConnections;
private final AtomicBoolean closed = new AtomicBoolean(false);

@GuardedBy("this")
Expand All @@ -40,8 +41,14 @@ public class DynamicConnectionPool implements ConnectionPool {

public DynamicConnectionPool(
ConnectionFactory connectionFactory, int maxConcurrencyPerConnection) {
this(connectionFactory, maxConcurrencyPerConnection, /*maxConnections=*/ 0);
}

public DynamicConnectionPool(
ConnectionFactory connectionFactory, int maxConcurrencyPerConnection, int maxConnections) {
this.connectionFactory = connectionFactory;
this.maxConcurrencyPerConnection = maxConcurrencyPerConnection;
this.maxConnections = maxConnections;
this.factories = new ArrayList<>();
}

Expand All @@ -61,12 +68,19 @@ public void close() throws IOException {
}
}

@GuardedBy("this")
private SharedConnectionFactory nextFactory() {
int index = Math.abs(indexTicker % factories.size());
indexTicker += 1;
return factories.get(index);
}

/**
* Performs a simple round robin on the list of {@link SharedConnectionFactory} and return one
* having available connections at this moment.
* Performs a simple round robin on the list of {@link SharedConnectionFactory}.
*
* <p>If no factory has available connections, it will create a new {@link
* SharedConnectionFactory}.
* <p>This will try to find a factory that has available connections at this moment. If no factory
* has available connections, and the number of factories is less than {@link #maxConnections}, it
* will create a new {@link SharedConnectionFactory}.
*/
private SharedConnectionFactory nextAvailableFactory() {
if (closed.get()) {
Expand All @@ -75,19 +89,20 @@ private SharedConnectionFactory nextAvailableFactory() {

synchronized (this) {
for (int times = 0; times < factories.size(); ++times) {
int index = Math.abs(indexTicker % factories.size());
indexTicker += 1;

SharedConnectionFactory factory = factories.get(index);
SharedConnectionFactory factory = nextFactory();
if (factory.numAvailableConnections() > 0) {
return factory;
}
}

SharedConnectionFactory factory =
new SharedConnectionFactory(connectionFactory, maxConcurrencyPerConnection);
factories.add(factory);
return factory;
if (maxConnections <= 0 || factories.size() < maxConnections) {
SharedConnectionFactory factory =
new SharedConnectionFactory(connectionFactory, maxConcurrencyPerConnection);
factories.add(factory);
return factory;
} else {
return nextFactory();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,26 @@ public final class RemoteOptions extends OptionsBase {
documentationCategory = OptionDocumentationCategory.REMOTE,
effectTags = {OptionEffectTag.HOST_MACHINE_RESOURCE_OPTIMIZATIONS},
help =
"The max. number of concurrent network connections to the remote cache/executor. By "
+ "default Bazel limits the number of TCP connections to 100. Setting this flag to "
+ "0 will make Bazel choose the number of connections automatically.")
"Limit the max number of concurrent requests to remote cache/executor. By default the"
+ " value is 100. Setting this to 0 means no limitation.\n"
+ "For HTTP remote cache, one TCP connection could handle one request at one time, so"
+ " it could open up to --remote_max_connections TCP connections.\n"
+ "If --incompatible_remote_max_connections_grpc is set, this also applies to gRPC"
+ " remote cache/executor. One gRPC channel could usually handle 100+ concurrent"
+ " requests. We assume the number is 100, so it could open up to"
+ " `--remote_max_connections / 100` gRPC channels. Each gRPC channel could open 1 or"
+ " more TCP connections depending on the number of resolved server IPs.")
public int remoteMaxConnections;

@Option(
name = "incompatible_remote_max_connections_grpc",
defaultValue = "false",
documentationCategory = OptionDocumentationCategory.REMOTE,
effectTags = {OptionEffectTag.UNKNOWN},
metadataTags = {OptionMetadataTag.INCOMPATIBLE_CHANGE},
help = "If set to true, --remote_max_connections also applies to gRPC cache/executor.")
public boolean incompatibleRemoteMaxConnectionsGrpc;

@Option(
name = "remote_executor",
defaultValue = "null",
Expand Down