Skip to content

Commit

Permalink
Revert "Upgraded ElasticSearch to get rid of CVEs. (#13747)" (#13862)
Browse files Browse the repository at this point in the history
This reverts commit 1af8d3f.
  • Loading branch information
dlg99 authored Jan 20, 2022
1 parent 4924e6d commit f5c3c2e
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 206 deletions.
4 changes: 1 addition & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ flexible messaging model and an intuitive client API.</description>
<mariadb-jdbc.version>2.6.0</mariadb-jdbc.version>
<hdfs-offload-version3>3.3.1</hdfs-offload-version3>
<json-smart.version>2.4.7</json-smart.version>
<elasticsearch.version>7.16.3</elasticsearch.version>
<elasticsearch.version>7.9.1</elasticsearch.version>
<presto.version>332</presto.version>
<scala.binary.version>2.13</scala.binary.version>
<scala-library.version>2.13.6</scala-library.version>
Expand Down Expand Up @@ -1328,7 +1328,6 @@ flexible messaging model and an intuitive client API.</description>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<encoding>UTF-8</encoding>
Expand Down Expand Up @@ -1846,7 +1845,6 @@ flexible messaging model and an intuitive client API.</description>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<!-- for some reason, setting maven.compiler.release property alone doesn't work -->
Expand Down
14 changes: 1 addition & 13 deletions pulsar-io/elastic-search/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@
<artifactId>pulsar-io-elastic-search</artifactId>
<name>Pulsar IO :: ElasticSearch</name>

<properties>
<!--
Work-around for "Container exited with code 137" (OOM)
-->
<testReuseFork>false</testReuseFork>
<testForkCount>1</testForkCount>
</properties>

<dependencies>

<dependency>
Expand Down Expand Up @@ -91,11 +83,7 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xcontent.XContentType;

@Slf4j
public class ElasticSearchClient implements AutoCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.unit.TimeValue;

public class RandomExponentialBackoffPolicy extends BackoffPolicy {
private final RandomExponentialRetry randomExponentialRetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class ElasticSearchClientSslTests {

public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.10.2-amd64");

final static String INDEX = "myindex";

Expand All @@ -44,8 +44,9 @@ public class ElasticSearchClientSslTests {
@Test
public void testSslBasic() throws IOException {
try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
.withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
.withFileSystemBind(sslResourceDir, configDir + "/ssl")
.withPassword("elastic")
.withEnv("ELASTIC_PASSWORD","elastic") // boostrap password
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.http.ssl.enabled", "true")
Expand Down Expand Up @@ -79,8 +80,9 @@ public void testSslBasic() throws IOException {
@Test
public void testSslWithHostnameVerification() throws IOException {
try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
.withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
.withFileSystemBind(sslResourceDir, configDir + "/ssl")
.withPassword("elastic")
.withEnv("ELASTIC_PASSWORD","elastic") // boostrap password
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.http.ssl.enabled", "true")
Expand Down Expand Up @@ -117,8 +119,9 @@ public void testSslWithHostnameVerification() throws IOException {
@Test
public void testSslWithClientAuth() throws IOException {
try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
.withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
.withFileSystemBind(sslResourceDir, configDir + "/ssl")
.withPassword("elastic")
.withEnv("ELASTIC_PASSWORD","elastic") // boostrap password
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.http.ssl.enabled", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@
*/
package org.apache.pulsar.io.elasticsearch;

import eu.rekawek.toxiproxy.model.ToxicDirection;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer;
import org.apache.pulsar.io.elasticsearch.testcontainers.ChaosContainer;
import org.awaitility.Awaitility;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.junit.AfterClass;
import org.mockito.Mockito;
import org.testcontainers.containers.Network;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -47,22 +45,21 @@

@Slf4j
public class ElasticSearchClientTests {

public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
.orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64");

static ElasticsearchContainer container;
static Network network = Network.newNetwork();

@BeforeClass
public static final void initBeforeClass() throws IOException {
container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE).withNetwork(network);
container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE);
container.start();
}

@AfterClass
public static void closeAfterClass() {
container.close();
network.close();
}

static class MockRecord<T> implements Record<T> {
Expand Down Expand Up @@ -225,111 +222,101 @@ public void testMalformedDocIgnore() throws Exception {

@Test
public void testBulkRetry() throws Exception {
try (ElasticToxiproxiContainer toxiproxy = new ElasticToxiproxiContainer(container, network)) {
toxiproxy.start();

final String index = "indexbulktest-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://" + toxiproxy.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setMaxRetries(1000)
.setBulkActions(2)
.setRetryBackoffInMs(100)
// disabled, we want to have full control over flush() method
.setBulkFlushIntervalInMs(-1);

try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try {
assertTrue(client.createIndexIfNeeded(index));
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);
final String index = "indexbulktest-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://"+container.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setMaxRetries(1000)
.setBulkActions(2)
.setRetryBackoffInMs(100)
// disabled, we want to have full control over flush() method
.setBulkFlushIntervalInMs(-1);

try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try {
assertTrue(client.createIndexIfNeeded(index));
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);

log.info("starting the toxic");
toxiproxy.getProxy().setConnectionCut(false);
toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 15000);
toxiproxy.removeToxicAfterDelay("elasticpause", 15000);
ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 15);
chaosContainer.start();

client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);
client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);

client.flush();
assertEquals(mockRecord.acked, 3);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 3);
} finally {
client.delete(index);
}
chaosContainer.stop();
client.flush();
assertEquals(mockRecord.acked, 3);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 3);
} finally {
client.delete(index);
}
}
}

@Test
public void testBulkBlocking() throws Exception {
try (ElasticToxiproxiContainer toxiproxy = new ElasticToxiproxiContainer(container, network)) {
toxiproxy.start();

final String index = "indexblocking-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://" + toxiproxy.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setMaxRetries(1000)
.setBulkActions(2)
.setBulkConcurrentRequests(2)
.setRetryBackoffInMs(100)
.setBulkFlushIntervalInMs(10000);
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
assertTrue(client.createIndexIfNeeded(index));
final String index = "indexblocking-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://"+container.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setMaxRetries(1000)
.setBulkActions(2)
.setBulkConcurrentRequests(2)
.setRetryBackoffInMs(100)
.setBulkFlushIntervalInMs(10000);
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
assertTrue(client.createIndexIfNeeded(index));

try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
for (int i = 1; i <= 5; i++) {
client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
}

Awaitility.await().untilAsserted(() -> {
assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
assertEquals(mockRecord.failed, 0);
assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
});
client.flush();
Awaitility.await().untilAsserted(() -> {
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.acked, 5);
assertEquals(client.totalHits(index), 5);
});

log.info("starting the toxic");
toxiproxy.getProxy().setConnectionCut(false);
toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 35000);
toxiproxy.removeToxicAfterDelay("elasticpause", 30000);

long start = System.currentTimeMillis();

// 11th bulkIndex is blocking because we have 2 pending requests, and the 3rd request is blocked.
for (int i = 6; i <= 15; i++) {
client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
log.info("{} index {}", System.currentTimeMillis(), i);
}
long elapsed = System.currentTimeMillis() - start;
log.info("elapsed = {}", elapsed);
assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy

Thread.sleep(1000L);
assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
assertEquals(client.records.size(), 0);
try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
for (int i = 1; i <= 5; i++) {
client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
}

} finally {
client.delete(index);
Awaitility.await().untilAsserted(() -> {
assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
assertEquals(mockRecord.failed, 0);
assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
});
client.flush();
Awaitility.await().untilAsserted(() -> {
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.acked, 5);
assertEquals(client.totalHits(index), 5);
});

ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 30);
chaosContainer.start();
Thread.sleep(1000L);

// 11th bulkIndex is blocking because we have 2 pending requests, and the 3rd request is blocked.
long start = System.currentTimeMillis();
for (int i = 6; i <= 15; i++) {
client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
log.info("{} index {}", System.currentTimeMillis(), i);
}
long elapsed = System.currentTimeMillis() - start;
log.info("elapsed = {}", elapsed);
assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy

Thread.sleep(1000L);
assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
assertEquals(client.records.size(), 0);

chaosContainer.stop();
} finally {
client.delete(index);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
public class ElasticSearchSinkRawDataTests {

public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
.orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64");

private static ElasticsearchContainer container;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
public class ElasticSearchSinkTests {

public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
.orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64");

private static ElasticsearchContainer container;

Expand Down
Loading

0 comments on commit f5c3c2e

Please sign in to comment.