Skip to content

Commit 49416db

Browse files
authored
Watcher: Increase HttpClient parallel sent requests (#31859)
The HTTPClient used in watcher is based on the apache http client. The current client is using a lot of defaults - which are not always optimal. Two of those defaults are the maximum number of total connections and the maximum number of connections to a single route. If one of those limits is reached, the HTTPClient waits for a connection to be finished thus acting in a blocking fashion. In order to prevent this when many requests are being executed, we increase the limit of total connections as well as the connections per route (a route is basically an endpoint, which also contains proxy information, not containing an URL, just hosts). On top of that an additional option has been set to evict long running connections, which can potentially be reused after some time. As this requires an additional background thread, this required some changes to ensure that the httpclient is closed properly. This is a backport of #30130
1 parent 8cc0dcb commit 49416db

File tree

6 files changed

+124
-88
lines changed

6 files changed

+124
-88
lines changed

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.common.util.PageCacheRecycler;
2929
import org.elasticsearch.common.util.concurrent.ThreadContext;
3030
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
31+
import org.elasticsearch.core.internal.io.IOUtils;
3132
import org.elasticsearch.env.Environment;
3233
import org.elasticsearch.env.NodeEnvironment;
3334
import org.elasticsearch.http.HttpServerTransport;
@@ -38,6 +39,7 @@
3839
import org.elasticsearch.ingest.Processor;
3940
import org.elasticsearch.license.LicenseService;
4041
import org.elasticsearch.license.XPackLicenseState;
42+
import org.elasticsearch.persistent.PersistentTasksExecutor;
4143
import org.elasticsearch.plugins.ActionPlugin;
4244
import org.elasticsearch.plugins.AnalysisPlugin;
4345
import org.elasticsearch.plugins.ClusterPlugin;
@@ -57,9 +59,9 @@
5759
import org.elasticsearch.transport.Transport;
5860
import org.elasticsearch.transport.TransportInterceptor;
5961
import org.elasticsearch.watcher.ResourceWatcherService;
60-
import org.elasticsearch.persistent.PersistentTasksExecutor;
6162
import org.elasticsearch.xpack.core.ssl.SSLService;
6263

64+
import java.io.IOException;
6365
import java.nio.file.Path;
6466
import java.util.ArrayList;
6567
import java.util.Collection;
@@ -391,6 +393,11 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
391393
.collect(toList());
392394
}
393395

396+
@Override
397+
public void close() throws IOException {
398+
IOUtils.close(plugins);
399+
}
400+
394401
private <T> List<T> filterPlugins(Class<T> type) {
395402
return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p))
396403
.collect(Collectors.toList());

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.unit.TimeValue;
3131
import org.elasticsearch.common.util.concurrent.EsExecutors;
3232
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
33+
import org.elasticsearch.core.internal.io.IOUtils;
3334
import org.elasticsearch.env.Environment;
3435
import org.elasticsearch.env.NodeEnvironment;
3536
import org.elasticsearch.index.IndexModule;
@@ -221,6 +222,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {
221222

222223
private static final Logger logger = Loggers.getLogger(Watcher.class);
223224
private WatcherIndexingListener listener;
225+
private HttpClient httpClient;
224226

225227
// public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
226228
// List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
@@ -287,7 +289,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
287289
// TODO: add more auth types, or remove this indirection
288290
HttpAuthRegistry httpAuthRegistry = new HttpAuthRegistry(httpAuthFactories);
289291
HttpRequestTemplate.Parser httpTemplateParser = new HttpRequestTemplate.Parser(httpAuthRegistry);
290-
final HttpClient httpClient = new HttpClient(settings, httpAuthRegistry, getSslService());
292+
httpClient = new HttpClient(settings, httpAuthRegistry, getSslService());
291293

292294
// notification
293295
EmailService emailService = new EmailService(settings, cryptoService, clusterService.getClusterSettings());
@@ -632,4 +634,9 @@ public List<BootstrapCheck> getBootstrapChecks() {
632634
public List<ScriptContext> getContexts() {
633635
return Arrays.asList(Watcher.SCRIPT_SEARCH_CONTEXT, Watcher.SCRIPT_EXECUTABLE_CONTEXT, Watcher.SCRIPT_TEMPLATE_CONTEXT);
634636
}
637+
638+
@Override
639+
public void close() throws IOException {
640+
IOUtils.closeWhileHandlingException(httpClient);
641+
}
635642
}

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpClient.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646

4747
import javax.net.ssl.HostnameVerifier;
4848
import java.io.ByteArrayOutputStream;
49+
import java.io.Closeable;
4950
import java.io.IOException;
5051
import java.io.InputStream;
5152
import java.net.URI;
@@ -56,9 +57,13 @@
5657
import java.util.List;
5758
import java.util.Map;
5859

59-
public class HttpClient extends AbstractComponent {
60+
public class HttpClient extends AbstractComponent implements Closeable {
6061

6162
private static final String SETTINGS_SSL_PREFIX = "xpack.http.ssl.";
63+
// picking a reasonable high value here to allow for setups with lots of watch executions or many http inputs/actions
64+
// this is also used as the value per route, if you are connecting to the same endpoint a lot, which is likely, when
65+
// you are querying a remote Elasticsearch cluster
66+
private static final int MAX_CONNECTIONS = 500;
6267

6368
private final HttpAuthRegistry httpAuthRegistry;
6469
private final CloseableHttpClient client;
@@ -84,6 +89,10 @@ public HttpClient(Settings settings, HttpAuthRegistry httpAuthRegistry, SSLServi
8489
SSLConnectionSocketFactory factory = new SSLConnectionSocketFactory(sslService.sslSocketFactory(sslSettings), verifier);
8590
clientBuilder.setSSLSocketFactory(factory);
8691

92+
clientBuilder.evictExpiredConnections();
93+
clientBuilder.setMaxConnPerRoute(MAX_CONNECTIONS);
94+
clientBuilder.setMaxConnTotal(MAX_CONNECTIONS);
95+
8796
client = clientBuilder.build();
8897
}
8998

@@ -251,6 +260,11 @@ private URI createURI(HttpRequest request) {
251260
}
252261
}
253262

263+
@Override
264+
public void close() throws IOException {
265+
client.close();
266+
}
267+
254268
/**
255269
* Helper class to have all HTTP methods except HEAD allow for an body, including GET
256270
*/

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookActionTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.junit.Before;
4545

4646
import javax.mail.internet.AddressException;
47-
4847
import java.io.IOException;
4948
import java.util.Map;
5049

@@ -219,10 +218,9 @@ private WebhookActionFactory webhookFactory(HttpClient client) {
219218

220219
public void testThatSelectingProxyWorks() throws Exception {
221220
Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
222-
HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry,
223-
new SSLService(environment.settings(), environment));
224221

225-
try (MockWebServer proxyServer = new MockWebServer()) {
222+
try (HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry,
223+
new SSLService(environment.settings(), environment)); MockWebServer proxyServer = new MockWebServer()) {
226224
proxyServer.start();
227225
proxyServer.enqueue(new MockResponse().setResponseCode(200).setBody("fullProxiedContent"));
228226

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpClientTests.java

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public void init() throws Exception {
7777
@After
7878
public void shutdown() throws Exception {
7979
webServer.close();
80+
httpClient.close();
8081
}
8182

8283
public void testBasics() throws Exception {
@@ -184,17 +185,18 @@ public void testHttps() throws Exception {
184185
.setSecureSettings(secureSettings)
185186
.build();
186187
}
187-
httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
188-
secureSettings = new MockSecureSettings();
189-
// We can't use the client created above for the server since it is only a truststore
190-
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode");
191-
Settings settings2 = Settings.builder()
192-
.put("xpack.ssl.keystore.path", getDataPath("/org/elasticsearch/xpack/security/keystore/testnode.jks"))
193-
.setSecureSettings(secureSettings)
194-
.build();
188+
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
189+
secureSettings = new MockSecureSettings();
190+
// We can't use the client created above for the server since it is only a truststore
191+
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode");
192+
Settings settings2 = Settings.builder()
193+
.put("xpack.ssl.keystore.path", getDataPath("/org/elasticsearch/xpack/security/keystore/testnode.jks"))
194+
.setSecureSettings(secureSettings)
195+
.build();
195196

196-
TestsSSLService sslService = new TestsSSLService(settings2, environment);
197-
testSslMockWebserver(sslService.sslContext(), false);
197+
TestsSSLService sslService = new TestsSSLService(settings2, environment);
198+
testSslMockWebserver(client, sslService.sslContext(), false);
199+
}
198200
}
199201

200202
public void testHttpsDisableHostnameVerification() throws Exception {
@@ -217,18 +219,19 @@ public void testHttpsDisableHostnameVerification() throws Exception {
217219
.setSecureSettings(secureSettings)
218220
.build();
219221
}
220-
httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
221-
MockSecureSettings secureSettings = new MockSecureSettings();
222-
// We can't use the client created above for the server since it only defines a truststore
223-
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode-no-subjaltname");
224-
Settings settings2 = Settings.builder()
225-
.put("xpack.ssl.keystore.path",
226-
getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode-no-subjaltname.jks"))
227-
.setSecureSettings(secureSettings)
228-
.build();
222+
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
223+
MockSecureSettings secureSettings = new MockSecureSettings();
224+
// We can't use the client created above for the server since it only defines a truststore
225+
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode-no-subjaltname");
226+
Settings settings2 = Settings.builder()
227+
.put("xpack.ssl.keystore.path",
228+
getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode-no-subjaltname.jks"))
229+
.setSecureSettings(secureSettings)
230+
.build();
229231

230-
TestsSSLService sslService = new TestsSSLService(settings2, environment);
231-
testSslMockWebserver(sslService.sslContext(), false);
232+
TestsSSLService sslService = new TestsSSLService(settings2, environment);
233+
testSslMockWebserver(client, sslService.sslContext(), false);
234+
}
232235
}
233236

234237
public void testHttpsClientAuth() throws Exception {
@@ -241,19 +244,20 @@ public void testHttpsClientAuth() throws Exception {
241244
.build();
242245

243246
TestsSSLService sslService = new TestsSSLService(settings, environment);
244-
httpClient = new HttpClient(settings, authRegistry, sslService);
245-
testSslMockWebserver(sslService.sslContext(), true);
247+
try (HttpClient client = new HttpClient(settings, authRegistry, sslService)) {
248+
testSslMockWebserver(client, sslService.sslContext(), true);
249+
}
246250
}
247251

248-
private void testSslMockWebserver(SSLContext sslContext, boolean needClientAuth) throws IOException {
252+
private void testSslMockWebserver(HttpClient client, SSLContext sslContext, boolean needClientAuth) throws IOException {
249253
try (MockWebServer mockWebServer = new MockWebServer(sslContext, needClientAuth)) {
250254
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
251255
mockWebServer.start();
252256

253257
HttpRequest.Builder request = HttpRequest.builder("localhost", mockWebServer.getPort())
254258
.scheme(Scheme.HTTPS)
255259
.path("/test");
256-
HttpResponse response = httpClient.execute(request.build());
260+
HttpResponse response = client.execute(request.build());
257261
assertThat(response.status(), equalTo(200));
258262
assertThat(response.body().utf8ToString(), equalTo("body"));
259263

@@ -288,14 +292,14 @@ public void testHttpResponseWithAnyStatusCodeCanReturnBody() throws Exception {
288292

289293
@Network
290294
public void testHttpsWithoutTruststore() throws Exception {
291-
HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry, new SSLService(Settings.EMPTY, environment));
292-
293-
// Known server with a valid cert from a commercial CA
294-
HttpRequest.Builder request = HttpRequest.builder("www.elastic.co", 443).scheme(Scheme.HTTPS);
295-
HttpResponse response = httpClient.execute(request.build());
296-
assertThat(response.status(), equalTo(200));
297-
assertThat(response.hasContent(), is(true));
298-
assertThat(response.body(), notNullValue());
295+
try (HttpClient client = new HttpClient(Settings.EMPTY, authRegistry, new SSLService(Settings.EMPTY, environment))) {
296+
// Known server with a valid cert from a commercial CA
297+
HttpRequest.Builder request = HttpRequest.builder("www.elastic.co", 443).scheme(Scheme.HTTPS);
298+
HttpResponse response = client.execute(request.build());
299+
assertThat(response.status(), equalTo(200));
300+
assertThat(response.hasContent(), is(true));
301+
assertThat(response.body(), notNullValue());
302+
}
299303
}
300304

301305
public void testThatProxyCanBeConfigured() throws Exception {
@@ -307,15 +311,16 @@ public void testThatProxyCanBeConfigured() throws Exception {
307311
.put(HttpSettings.PROXY_HOST.getKey(), "localhost")
308312
.put(HttpSettings.PROXY_PORT.getKey(), proxyServer.getPort())
309313
.build();
310-
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
311314

312315
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
313316
.method(HttpMethod.GET)
314317
.path("/");
315318

316-
HttpResponse response = httpClient.execute(requestBuilder.build());
317-
assertThat(response.status(), equalTo(200));
318-
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
319+
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
320+
HttpResponse response = client.execute(requestBuilder.build());
321+
assertThat(response.status(), equalTo(200));
322+
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
323+
}
319324

320325
// ensure we hit the proxyServer and not the webserver
321326
assertThat(webServer.requests(), hasSize(0));
@@ -386,16 +391,16 @@ public void testProxyCanHaveDifferentSchemeThanRequest() throws Exception {
386391
.setSecureSettings(secureSettings)
387392
.build();
388393

389-
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
390-
391394
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
392395
.method(HttpMethod.GET)
393396
.scheme(Scheme.HTTP)
394397
.path("/");
395398

396-
HttpResponse response = httpClient.execute(requestBuilder.build());
397-
assertThat(response.status(), equalTo(200));
398-
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
399+
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
400+
HttpResponse response = client.execute(requestBuilder.build());
401+
assertThat(response.status(), equalTo(200));
402+
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
403+
}
399404

400405
// ensure we hit the proxyServer and not the webserver
401406
assertThat(webServer.requests(), hasSize(0));
@@ -413,16 +418,17 @@ public void testThatProxyCanBeOverriddenByRequest() throws Exception {
413418
.put(HttpSettings.PROXY_PORT.getKey(), proxyServer.getPort() + 1)
414419
.put(HttpSettings.PROXY_HOST.getKey(), "https")
415420
.build();
416-
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
417421

418422
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
419423
.method(HttpMethod.GET)
420424
.proxy(new HttpProxy("localhost", proxyServer.getPort(), Scheme.HTTP))
421425
.path("/");
422426

423-
HttpResponse response = httpClient.execute(requestBuilder.build());
424-
assertThat(response.status(), equalTo(200));
425-
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
427+
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
428+
HttpResponse response = client.execute(requestBuilder.build());
429+
assertThat(response.status(), equalTo(200));
430+
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
431+
}
426432

427433
// ensure we hit the proxyServer and not the webserver
428434
assertThat(webServer.requests(), hasSize(0));
@@ -535,12 +541,13 @@ public void testMaxHttpResponseSize() throws Exception {
535541
Settings settings = Settings.builder()
536542
.put(HttpSettings.MAX_HTTP_RESPONSE_SIZE.getKey(), new ByteSizeValue(randomBytesLength - 1, ByteSizeUnit.BYTES))
537543
.build();
538-
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(environment.settings(), environment));
539544

540545
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort()).method(HttpMethod.GET).path("/");
541546

542-
IOException e = expectThrows(IOException.class, () -> httpClient.execute(requestBuilder.build()));
543-
assertThat(e.getMessage(), startsWith("Maximum limit of"));
547+
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(environment.settings(), environment))) {
548+
IOException e = expectThrows(IOException.class, () -> client.execute(requestBuilder.build()));
549+
assertThat(e.getMessage(), startsWith("Maximum limit of"));
550+
}
544551
}
545552

546553
public void testThatGetRedirectIsFollowed() throws Exception {

0 commit comments

Comments
 (0)