Skip to content

Commit

Permalink
Remote: Limit max number of gRPC connections by --remote_max_connecti…
Browse files Browse the repository at this point in the history
…ons.

`--remote_max_connections` is only applied to HTTP remote cache. This PR makes it apply to gRPC cache/executor as well.

Note that `--remote_max_connections` limits the number of concurrent connections. For HTTP remote cache, one connection could handle one request at one time. For gRPC remote cache/executor, one connection could handle 100+ concurrent requests. So the default value `100` means we could make up to `100` concurrent requests for HTTP remote cache or `10000+` concurrent requests for gRPC remote cache/executor.

Fixes: #14178.

Closes #14202.

PiperOrigin-RevId: 410249542
  • Loading branch information
coeuvre authored and Wyverald committed Nov 16, 2021
1 parent 2794d85 commit 8d5973d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,13 @@ public ReferenceCounted touch(Object o) {
private final AtomicReference<String> authorityRef = new AtomicReference<>();

public ReferenceCountedChannel(ChannelConnectionFactory connectionFactory) {
this(connectionFactory, /*maxConnections=*/ 0);
}

public ReferenceCountedChannel(ChannelConnectionFactory connectionFactory, int maxConnections) {
this.dynamicConnectionPool =
new DynamicConnectionPool(connectionFactory, connectionFactory.maxConcurrency());
new DynamicConnectionPool(
connectionFactory, connectionFactory.maxConcurrency(), maxConnections);
}

public boolean isShutdown() {
Expand All @@ -87,12 +92,12 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
super.onClose(status, trailers);

try {
connection.close();
} catch (IOException e) {
throw new AssertionError(e.getMessage(), e);
} finally {
super.onClose(status, trailers);
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
// based on the resolved IPs of that server. We assume servers normally have 2 IPs. So the
// max concurrency per connection is 100.
int maxConcurrencyPerConnection = 100;
int maxConnections = 0;
if (remoteOptions.remoteMaxConnections > 0) {
maxConnections = remoteOptions.remoteMaxConnections;
}

if (enableRemoteExecution) {
ImmutableList.Builder<ClientInterceptor> interceptors = ImmutableList.builder();
Expand All @@ -367,7 +371,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.build(),
maxConcurrencyPerConnection));
maxConcurrencyPerConnection),
maxConnections);

// Create a separate channel if --remote_executor and --remote_cache point to different
// endpoints.
Expand All @@ -390,7 +395,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.build(),
maxConcurrencyPerConnection));
maxConcurrencyPerConnection),
maxConnections);
}

if (enableRemoteDownloader) {
Expand All @@ -411,7 +417,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.build(),
maxConcurrencyPerConnection));
maxConcurrencyPerConnection),
maxConnections);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class DynamicConnectionPool implements ConnectionPool {
private final ConnectionFactory connectionFactory;
private final int maxConcurrencyPerConnection;
private final int maxConnections;
private final AtomicBoolean closed = new AtomicBoolean(false);

@GuardedBy("this")
Expand All @@ -40,8 +41,14 @@ public class DynamicConnectionPool implements ConnectionPool {

public DynamicConnectionPool(
ConnectionFactory connectionFactory, int maxConcurrencyPerConnection) {
this(connectionFactory, maxConcurrencyPerConnection, /*maxConnections=*/ 0);
}

public DynamicConnectionPool(
ConnectionFactory connectionFactory, int maxConcurrencyPerConnection, int maxConnections) {
this.connectionFactory = connectionFactory;
this.maxConcurrencyPerConnection = maxConcurrencyPerConnection;
this.maxConnections = maxConnections;
this.factories = new ArrayList<>();
}

Expand All @@ -61,12 +68,19 @@ public void close() throws IOException {
}
}

@GuardedBy("this")
private SharedConnectionFactory nextFactory() {
int index = Math.abs(indexTicker % factories.size());
indexTicker += 1;
return factories.get(index);
}

/**
* Performs a simple round robin on the list of {@link SharedConnectionFactory} and return one
* having available connections at this moment.
* Performs a simple round robin on the list of {@link SharedConnectionFactory}.
*
* <p>If no factory has available connections, it will create a new {@link
* SharedConnectionFactory}.
* <p>This will try to find a factory that has available connections at this moment. If no factory
* has available connections, and the number of factories is less than {@link #maxConnections}, it
* will create a new {@link SharedConnectionFactory}.
*/
private SharedConnectionFactory nextAvailableFactory() {
if (closed.get()) {
Expand All @@ -75,19 +89,20 @@ private SharedConnectionFactory nextAvailableFactory() {

synchronized (this) {
for (int times = 0; times < factories.size(); ++times) {
int index = Math.abs(indexTicker % factories.size());
indexTicker += 1;

SharedConnectionFactory factory = factories.get(index);
SharedConnectionFactory factory = nextFactory();
if (factory.numAvailableConnections() > 0) {
return factory;
}
}

SharedConnectionFactory factory =
new SharedConnectionFactory(connectionFactory, maxConcurrencyPerConnection);
factories.add(factory);
return factory;
if (maxConnections <= 0 || factories.size() < maxConnections) {
SharedConnectionFactory factory =
new SharedConnectionFactory(connectionFactory, maxConcurrencyPerConnection);
factories.add(factory);
return factory;
} else {
return nextFactory();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ public final class RemoteOptions extends OptionsBase {
documentationCategory = OptionDocumentationCategory.REMOTE,
effectTags = {OptionEffectTag.HOST_MACHINE_RESOURCE_OPTIMIZATIONS},
help =
"The max. number of concurrent network connections to the remote cache/executor. By "
+ "default Bazel limits the number of TCP connections to 100. Setting this flag to "
+ "0 will make Bazel choose the number of connections automatically.")
"Limit the max number of concurrent connections to remote cache/executor. By default the"
+ " value is 100. Setting this to 0 means no limitation.\n"
+ "For HTTP remote cache, one TCP connection could handle one request at one time, so"
+ " Bazel could make up to --remote_max_connections concurrent requests.\n"
+ "For gRPC remote cache/executor, one gRPC channel could usually handle 100+"
+ " concurrent requests, so Bazel could make around `--remote_max_connections * 100`"
+ " concurrent requests.")
public int remoteMaxConnections;

@Option(
Expand Down

0 comments on commit 8d5973d

Please sign in to comment.