Skip to content

Commit 28cc588

Browse files
committed
Watcher: Increase HttpClient parallel sent requests (#30130)
The HTTPClient used in watcher is based on the apache http client. The current client is using a lot of defaults - which are not always optimal. Two of those defaults are the maximum number of total connections and the maximum number of connections to a single route. If one of those limits is reached, the HTTPClient waits for a connection to be finished thus acting in a blocking fashion. In order to prevent this when many requests are being executed, we increase the limit of total connections as well as the connections per route (a route is basically an endpoint, which also contains proxy information, not containing an URL, just hosts). On top of that an additional option has been set to evict long running connections, which can potentially be reused after some time. As this requires an additional background thread, this required some changes to ensure that the httpclient is closed properly. Also the timeout for this can be configured.
1 parent ae5ac44 commit 28cc588

File tree

7 files changed

+132
-88
lines changed

7 files changed

+132
-88
lines changed

docs/CHANGELOG.asciidoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,14 @@ these instead of the multi-argument versions. ({pull}29623[#29623])
320320

321321
Recovery::
322322
* Require translogUUID when reading global checkpoint {pull}28587[#28587] (issue: {issue}28435[#28435])
323+
Watcher HTTP client used in watches now allows more parallel connections to the
324+
same endpoint and evicts long running connections. ({pull}30130[#30130])
325+
326+
The cluster state listener to decide if watcher should be
327+
stopped/started/paused now runs far less code in an executor but is more
328+
synchronous and predictable. Also the trigger engine thread is only started on
329+
data nodes. And the Execute Watch API can be triggered regardless is watcher is
330+
started or stopped. ({pull}30118[#30118])
323331

324332
Added put index template API to the high level rest client ({pull}30400[#30400])
325333

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.common.util.PageCacheRecycler;
2929
import org.elasticsearch.common.util.concurrent.ThreadContext;
3030
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
31+
import org.elasticsearch.core.internal.io.IOUtils;
3132
import org.elasticsearch.env.Environment;
3233
import org.elasticsearch.env.NodeEnvironment;
3334
import org.elasticsearch.http.HttpServerTransport;
@@ -38,6 +39,7 @@
3839
import org.elasticsearch.ingest.Processor;
3940
import org.elasticsearch.license.LicenseService;
4041
import org.elasticsearch.license.XPackLicenseState;
42+
import org.elasticsearch.persistent.PersistentTasksExecutor;
4143
import org.elasticsearch.plugins.ActionPlugin;
4244
import org.elasticsearch.plugins.AnalysisPlugin;
4345
import org.elasticsearch.plugins.ClusterPlugin;
@@ -57,9 +59,9 @@
5759
import org.elasticsearch.transport.Transport;
5860
import org.elasticsearch.transport.TransportInterceptor;
5961
import org.elasticsearch.watcher.ResourceWatcherService;
60-
import org.elasticsearch.persistent.PersistentTasksExecutor;
6162
import org.elasticsearch.xpack.core.ssl.SSLService;
6263

64+
import java.io.IOException;
6365
import java.nio.file.Path;
6466
import java.util.ArrayList;
6567
import java.util.Collection;
@@ -391,6 +393,11 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
391393
.collect(toList());
392394
}
393395

396+
@Override
397+
public void close() throws IOException {
398+
IOUtils.close(plugins);
399+
}
400+
394401
private <T> List<T> filterPlugins(Class<T> type) {
395402
return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p))
396403
.collect(Collectors.toList());

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.unit.TimeValue;
3131
import org.elasticsearch.common.util.concurrent.EsExecutors;
3232
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
33+
import org.elasticsearch.core.internal.io.IOUtils;
3334
import org.elasticsearch.env.Environment;
3435
import org.elasticsearch.env.NodeEnvironment;
3536
import org.elasticsearch.index.IndexModule;
@@ -219,6 +220,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {
219220

220221
private static final Logger logger = Loggers.getLogger(Watcher.class);
221222
private WatcherIndexingListener listener;
223+
private HttpClient httpClient;
222224

223225
// public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
224226
// List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
@@ -285,7 +287,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
285287
// TODO: add more auth types, or remove this indirection
286288
HttpAuthRegistry httpAuthRegistry = new HttpAuthRegistry(httpAuthFactories);
287289
HttpRequestTemplate.Parser httpTemplateParser = new HttpRequestTemplate.Parser(httpAuthRegistry);
288-
final HttpClient httpClient = new HttpClient(settings, httpAuthRegistry, getSslService());
290+
httpClient = new HttpClient(settings, httpAuthRegistry, getSslService());
289291

290292
// notification
291293
EmailService emailService = new EmailService(settings, cryptoService, clusterService.getClusterSettings());
@@ -630,4 +632,9 @@ public List<BootstrapCheck> getBootstrapChecks() {
630632
public List<ScriptContext> getContexts() {
631633
return Arrays.asList(Watcher.SCRIPT_SEARCH_CONTEXT, Watcher.SCRIPT_EXECUTABLE_CONTEXT, Watcher.SCRIPT_TEMPLATE_CONTEXT);
632634
}
635+
636+
@Override
637+
public void close() throws IOException {
638+
IOUtils.closeWhileHandlingException(httpClient);
639+
}
633640
}

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpClient.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646

4747
import javax.net.ssl.HostnameVerifier;
4848
import java.io.ByteArrayOutputStream;
49+
import java.io.Closeable;
4950
import java.io.IOException;
5051
import java.io.InputStream;
5152
import java.net.URI;
@@ -56,9 +57,13 @@
5657
import java.util.List;
5758
import java.util.Map;
5859

59-
public class HttpClient extends AbstractComponent {
60+
public class HttpClient extends AbstractComponent implements Closeable {
6061

6162
private static final String SETTINGS_SSL_PREFIX = "xpack.http.ssl.";
63+
// picking a reasonable high value here to allow for setups with lots of watch executions or many http inputs/actions
64+
// this is also used as the value per route, if you are connecting to the same endpoint a lot, which is likely, when
65+
// you are querying a remote Elasticsearch cluster
66+
private static final int MAX_CONNECTIONS = 500;
6267

6368
private final HttpAuthRegistry httpAuthRegistry;
6469
private final CloseableHttpClient client;
@@ -84,6 +89,10 @@ public HttpClient(Settings settings, HttpAuthRegistry httpAuthRegistry, SSLServi
8489
SSLConnectionSocketFactory factory = new SSLConnectionSocketFactory(sslService.sslSocketFactory(sslSettings), verifier);
8590
clientBuilder.setSSLSocketFactory(factory);
8691

92+
clientBuilder.evictExpiredConnections();
93+
clientBuilder.setMaxConnPerRoute(MAX_CONNECTIONS);
94+
clientBuilder.setMaxConnTotal(MAX_CONNECTIONS);
95+
8796
client = clientBuilder.build();
8897
}
8998

@@ -251,6 +260,11 @@ private URI createURI(HttpRequest request) {
251260
}
252261
}
253262

263+
@Override
264+
public void close() throws IOException {
265+
client.close();
266+
}
267+
254268
/**
255269
* Helper class to have all HTTP methods except HEAD allow for an body, including GET
256270
*/

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookActionTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.junit.Before;
4545

4646
import javax.mail.internet.AddressException;
47-
4847
import java.io.IOException;
4948
import java.util.Map;
5049

@@ -219,10 +218,9 @@ private WebhookActionFactory webhookFactory(HttpClient client) {
219218

220219
public void testThatSelectingProxyWorks() throws Exception {
221220
Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
222-
HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry,
223-
new SSLService(environment.settings(), environment));
224221

225-
try (MockWebServer proxyServer = new MockWebServer()) {
222+
try (HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry,
223+
new SSLService(environment.settings(), environment)); MockWebServer proxyServer = new MockWebServer()) {
226224
proxyServer.start();
227225
proxyServer.enqueue(new MockResponse().setResponseCode(200).setBody("fullProxiedContent"));
228226

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpClientTests.java

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public void init() throws Exception {
7777
@After
7878
public void shutdown() throws Exception {
7979
webServer.close();
80+
httpClient.close();
8081
}
8182

8283
public void testBasics() throws Exception {
@@ -184,17 +185,18 @@ public void testHttps() throws Exception {
184185
.setSecureSettings(secureSettings)
185186
.build();
186187
}
187-
httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
188-
secureSettings = new MockSecureSettings();
189-
// We can't use the client created above for the server since it is only a truststore
190-
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode");
191-
Settings settings2 = Settings.builder()
192-
.put("xpack.ssl.keystore.path", getDataPath("/org/elasticsearch/xpack/security/keystore/testnode.jks"))
193-
.setSecureSettings(secureSettings)
194-
.build();
188+
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
189+
secureSettings = new MockSecureSettings();
190+
// We can't use the client created above for the server since it is only a truststore
191+
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode");
192+
Settings settings2 = Settings.builder()
193+
.put("xpack.ssl.keystore.path", getDataPath("/org/elasticsearch/xpack/security/keystore/testnode.jks"))
194+
.setSecureSettings(secureSettings)
195+
.build();
195196

196-
TestsSSLService sslService = new TestsSSLService(settings2, environment);
197-
testSslMockWebserver(sslService.sslContext(), false);
197+
TestsSSLService sslService = new TestsSSLService(settings2, environment);
198+
testSslMockWebserver(client, sslService.sslContext(), false);
199+
}
198200
}
199201

200202
public void testHttpsDisableHostnameVerification() throws Exception {
@@ -217,18 +219,19 @@ public void testHttpsDisableHostnameVerification() throws Exception {
217219
.setSecureSettings(secureSettings)
218220
.build();
219221
}
220-
httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
221-
MockSecureSettings secureSettings = new MockSecureSettings();
222-
// We can't use the client created above for the server since it only defines a truststore
223-
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode-no-subjaltname");
224-
Settings settings2 = Settings.builder()
225-
.put("xpack.ssl.keystore.path",
226-
getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode-no-subjaltname.jks"))
227-
.setSecureSettings(secureSettings)
228-
.build();
222+
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
223+
MockSecureSettings secureSettings = new MockSecureSettings();
224+
// We can't use the client created above for the server since it only defines a truststore
225+
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode-no-subjaltname");
226+
Settings settings2 = Settings.builder()
227+
.put("xpack.ssl.keystore.path",
228+
getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode-no-subjaltname.jks"))
229+
.setSecureSettings(secureSettings)
230+
.build();
229231

230-
TestsSSLService sslService = new TestsSSLService(settings2, environment);
231-
testSslMockWebserver(sslService.sslContext(), false);
232+
TestsSSLService sslService = new TestsSSLService(settings2, environment);
233+
testSslMockWebserver(client, sslService.sslContext(), false);
234+
}
232235
}
233236

234237
public void testHttpsClientAuth() throws Exception {
@@ -241,19 +244,20 @@ public void testHttpsClientAuth() throws Exception {
241244
.build();
242245

243246
TestsSSLService sslService = new TestsSSLService(settings, environment);
244-
httpClient = new HttpClient(settings, authRegistry, sslService);
245-
testSslMockWebserver(sslService.sslContext(), true);
247+
try (HttpClient client = new HttpClient(settings, authRegistry, sslService)) {
248+
testSslMockWebserver(client, sslService.sslContext(), true);
249+
}
246250
}
247251

248-
private void testSslMockWebserver(SSLContext sslContext, boolean needClientAuth) throws IOException {
252+
private void testSslMockWebserver(HttpClient client, SSLContext sslContext, boolean needClientAuth) throws IOException {
249253
try (MockWebServer mockWebServer = new MockWebServer(sslContext, needClientAuth)) {
250254
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
251255
mockWebServer.start();
252256

253257
HttpRequest.Builder request = HttpRequest.builder("localhost", mockWebServer.getPort())
254258
.scheme(Scheme.HTTPS)
255259
.path("/test");
256-
HttpResponse response = httpClient.execute(request.build());
260+
HttpResponse response = client.execute(request.build());
257261
assertThat(response.status(), equalTo(200));
258262
assertThat(response.body().utf8ToString(), equalTo("body"));
259263

@@ -288,14 +292,14 @@ public void testHttpResponseWithAnyStatusCodeCanReturnBody() throws Exception {
288292

289293
@Network
290294
public void testHttpsWithoutTruststore() throws Exception {
291-
HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry, new SSLService(Settings.EMPTY, environment));
292-
293-
// Known server with a valid cert from a commercial CA
294-
HttpRequest.Builder request = HttpRequest.builder("www.elastic.co", 443).scheme(Scheme.HTTPS);
295-
HttpResponse response = httpClient.execute(request.build());
296-
assertThat(response.status(), equalTo(200));
297-
assertThat(response.hasContent(), is(true));
298-
assertThat(response.body(), notNullValue());
295+
try (HttpClient client = new HttpClient(Settings.EMPTY, authRegistry, new SSLService(Settings.EMPTY, environment))) {
296+
// Known server with a valid cert from a commercial CA
297+
HttpRequest.Builder request = HttpRequest.builder("www.elastic.co", 443).scheme(Scheme.HTTPS);
298+
HttpResponse response = client.execute(request.build());
299+
assertThat(response.status(), equalTo(200));
300+
assertThat(response.hasContent(), is(true));
301+
assertThat(response.body(), notNullValue());
302+
}
299303
}
300304

301305
public void testThatProxyCanBeConfigured() throws Exception {
@@ -307,15 +311,16 @@ public void testThatProxyCanBeConfigured() throws Exception {
307311
.put(HttpSettings.PROXY_HOST.getKey(), "localhost")
308312
.put(HttpSettings.PROXY_PORT.getKey(), proxyServer.getPort())
309313
.build();
310-
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
311314

312315
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
313316
.method(HttpMethod.GET)
314317
.path("/");
315318

316-
HttpResponse response = httpClient.execute(requestBuilder.build());
317-
assertThat(response.status(), equalTo(200));
318-
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
319+
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
320+
HttpResponse response = client.execute(requestBuilder.build());
321+
assertThat(response.status(), equalTo(200));
322+
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
323+
}
319324

320325
// ensure we hit the proxyServer and not the webserver
321326
assertThat(webServer.requests(), hasSize(0));
@@ -386,16 +391,16 @@ public void testProxyCanHaveDifferentSchemeThanRequest() throws Exception {
386391
.setSecureSettings(secureSettings)
387392
.build();
388393

389-
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
390-
391394
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
392395
.method(HttpMethod.GET)
393396
.scheme(Scheme.HTTP)
394397
.path("/");
395398

396-
HttpResponse response = httpClient.execute(requestBuilder.build());
397-
assertThat(response.status(), equalTo(200));
398-
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
399+
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
400+
HttpResponse response = client.execute(requestBuilder.build());
401+
assertThat(response.status(), equalTo(200));
402+
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
403+
}
399404

400405
// ensure we hit the proxyServer and not the webserver
401406
assertThat(webServer.requests(), hasSize(0));
@@ -413,16 +418,17 @@ public void testThatProxyCanBeOverriddenByRequest() throws Exception {
413418
.put(HttpSettings.PROXY_PORT.getKey(), proxyServer.getPort() + 1)
414419
.put(HttpSettings.PROXY_HOST.getKey(), "https")
415420
.build();
416-
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
417421

418422
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
419423
.method(HttpMethod.GET)
420424
.proxy(new HttpProxy("localhost", proxyServer.getPort(), Scheme.HTTP))
421425
.path("/");
422426

423-
HttpResponse response = httpClient.execute(requestBuilder.build());
424-
assertThat(response.status(), equalTo(200));
425-
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
427+
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
428+
HttpResponse response = client.execute(requestBuilder.build());
429+
assertThat(response.status(), equalTo(200));
430+
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
431+
}
426432

427433
// ensure we hit the proxyServer and not the webserver
428434
assertThat(webServer.requests(), hasSize(0));
@@ -535,12 +541,13 @@ public void testMaxHttpResponseSize() throws Exception {
535541
Settings settings = Settings.builder()
536542
.put(HttpSettings.MAX_HTTP_RESPONSE_SIZE.getKey(), new ByteSizeValue(randomBytesLength - 1, ByteSizeUnit.BYTES))
537543
.build();
538-
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(environment.settings(), environment));
539544

540545
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort()).method(HttpMethod.GET).path("/");
541546

542-
IOException e = expectThrows(IOException.class, () -> httpClient.execute(requestBuilder.build()));
543-
assertThat(e.getMessage(), startsWith("Maximum limit of"));
547+
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(environment.settings(), environment))) {
548+
IOException e = expectThrows(IOException.class, () -> client.execute(requestBuilder.build()));
549+
assertThat(e.getMessage(), startsWith("Maximum limit of"));
550+
}
544551
}
545552

546553
public void testThatGetRedirectIsFollowed() throws Exception {

0 commit comments

Comments
 (0)