diff --git a/pom.xml b/pom.xml index 8d7049a27bc89..ebd24ec876402 100644 --- a/pom.xml +++ b/pom.xml @@ -153,7 +153,7 @@ flexible messaging model and an intuitive client API. 2.6.0 3.3.1 2.4.7 - 7.16.3 + 7.9.1 334 2.13 2.13.6 @@ -1311,7 +1311,6 @@ flexible messaging model and an intuitive client API. - org.apache.maven.plugins maven-compiler-plugin UTF-8 @@ -1819,7 +1818,6 @@ flexible messaging model and an intuitive client API. - org.apache.maven.plugins maven-compiler-plugin diff --git a/pulsar-io/elastic-search/pom.xml b/pulsar-io/elastic-search/pom.xml index 312be742c81ad..56cbd624223b4 100644 --- a/pulsar-io/elastic-search/pom.xml +++ b/pulsar-io/elastic-search/pom.xml @@ -28,14 +28,6 @@ pulsar-io-elastic-search Pulsar IO :: ElasticSearch - - - false - 1 - - @@ -76,12 +68,6 @@ ${project.version} - - org.awaitility - awaitility - test - - org.elasticsearch.client elasticsearch-rest-high-level-client @@ -90,11 +76,7 @@ org.testcontainers elasticsearch - test - - - org.testcontainers - toxiproxy + 1.15.3 test diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java index d31425a9241f1..0cdbb2894acf2 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java @@ -48,7 +48,6 @@ import org.apache.pulsar.functions.api.Record; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; @@ -67,11 +66,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.core.TimeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.xcontent.XContentType; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; @@ -90,7 +89,7 @@ import java.util.concurrent.atomic.AtomicReference; @Slf4j -public class ElasticSearchClient implements AutoCloseable { +public class ElasticSearchClient { static final String[] malformedErrors = { "mapper_parsing_exception", @@ -345,7 +344,6 @@ public void flush() { bulkProcessor.flush(); } - @Override public void close() { try { if (bulkProcessor != null) { @@ -469,11 +467,6 @@ protected org.elasticsearch.action.search.SearchResponse search(String indexName RequestOptions.DEFAULT); } - @VisibleForTesting - protected org.elasticsearch.action.support.master.AcknowledgedResponse delete(String indexName) throws IOException { - return client.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT); - } - private T retry(Callable callable, String source) { try { return backoffRetry.retry(callable, config.getMaxRetries(), config.getRetryBackoffInMs(), source); diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialBackoffPolicy.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialBackoffPolicy.java index 6e973a4d3c4be..ef5e7e03937cb 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialBackoffPolicy.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialBackoffPolicy.java @@ -19,7 +19,7 @@ package org.apache.pulsar.io.elasticsearch; import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.core.TimeValue; +import org.elasticsearch.common.unit.TimeValue; import java.util.Iterator; import java.util.NoSuchElementException; diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java index 102fa47e95e90..9f878aad1ad17 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java @@ -35,7 +35,7 @@ public class ElasticSearchClientSslTests { public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE")) - .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64"); + .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.10.2-amd64"); final static String INDEX = "myindex"; @@ -45,8 +45,9 @@ public class ElasticSearchClientSslTests { @Test public void testSslBasic() throws IOException { try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE) + .withCreateContainerCmdModifier(c -> c.withName("elasticsearch")) .withFileSystemBind(sslResourceDir, configDir + "/ssl") - .withPassword("elastic") + .withEnv("ELASTIC_PASSWORD","elastic") // boostrap password .withEnv("xpack.license.self_generated.type", "trial") .withEnv("xpack.security.enabled", "true") .withEnv("xpack.security.http.ssl.enabled", "true") @@ -80,8 +81,9 @@ public void testSslBasic() throws IOException { @Test public void testSslWithHostnameVerification() throws IOException { try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE) + .withCreateContainerCmdModifier(c -> c.withName("elasticsearch")) .withFileSystemBind(sslResourceDir, configDir + "/ssl") - .withPassword("elastic") + .withEnv("ELASTIC_PASSWORD","elastic") // boostrap password .withEnv("xpack.license.self_generated.type", "trial") .withEnv("xpack.security.enabled", "true") .withEnv("xpack.security.http.ssl.enabled", "true") @@ -118,8 +120,9 @@ public void testSslWithHostnameVerification() throws IOException { @Test public void testSslWithClientAuth() throws IOException { try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE) + .withCreateContainerCmdModifier(c -> c.withName("elasticsearch")) .withFileSystemBind(sslResourceDir, configDir + "/ssl") - .withPassword("elastic") + .withEnv("ELASTIC_PASSWORD","elastic") // boostrap password .withEnv("xpack.license.self_generated.type", "trial") .withEnv("xpack.security.enabled", "true") .withEnv("xpack.security.http.ssl.enabled", "true") diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java index 619e2513d7174..bd93b1c3ba130 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java @@ -18,45 +18,35 @@ */ package org.apache.pulsar.io.elasticsearch; -import eu.rekawek.toxiproxy.model.ToxicDirection; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer; -import org.awaitility.Awaitility; +import org.apache.pulsar.io.elasticsearch.testcontainers.ChaosContainer; import org.junit.AfterClass; -import org.testcontainers.containers.Network; import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.io.IOException; import java.util.Optional; -import java.util.UUID; import static org.junit.Assert.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; - -@Slf4j public class ElasticSearchClientTests { - public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE")) - .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64"); + public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE")) + .orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64"); public final static String INDEX = "myindex"; static ElasticsearchContainer container; static ElasticSearchConfig config; static ElasticSearchClient client; - static Network network = Network.newNetwork(); @BeforeClass public static final void initBeforeClass() throws IOException { - container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE).withNetwork(network); + container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE); container.start(); config = new ElasticSearchConfig(); @@ -69,7 +59,6 @@ public static final void initBeforeClass() throws IOException { @AfterClass public static void closeAfterClass() { container.close(); - network.close(); } static class MockRecord implements Record { @@ -171,112 +160,91 @@ public void testMalformedDocIgnore() throws Exception { @Test public void testBulkRetry() throws Exception { - try (ElasticToxiproxiContainer toxiproxy = new ElasticToxiproxiContainer(container, network)) { - toxiproxy.start(); - - final String index = "indexbulktest-" + UUID.randomUUID(); - ElasticSearchConfig config = new ElasticSearchConfig() - .setElasticSearchUrl("http://" + toxiproxy.getHttpHostAddress()) - .setIndexName(index) - .setBulkEnabled(true) - .setMaxRetries(1000) - .setBulkActions(2) - .setRetryBackoffInMs(100) - // disabled, we want to have full control over flush() method - .setBulkFlushIntervalInMs(-1); - - try (ElasticSearchClient client = new ElasticSearchClient(config);) { - try { - assertTrue(client.createIndexIfNeeded(index)); - MockRecord mockRecord = new MockRecord<>(); - client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}")); - client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}")); - assertEquals(mockRecord.acked, 2); - assertEquals(mockRecord.failed, 0); - assertEquals(client.totalHits(index), 2); - - log.info("starting the toxic"); - toxiproxy.getProxy().setConnectionCut(false); - toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 15000); - toxiproxy.removeToxicAfterDelay("elasticpause", 15000); - - client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}")); - assertEquals(mockRecord.acked, 2); - assertEquals(mockRecord.failed, 0); - assertEquals(client.totalHits(index), 2); - - client.flush(); - assertEquals(mockRecord.acked, 3); - assertEquals(mockRecord.failed, 0); - assertEquals(client.totalHits(index), 3); - } finally { - client.delete(index); - } - } - } + final String index = "indexbulktest"; + ElasticSearchConfig config = new ElasticSearchConfig() + .setElasticSearchUrl("http://"+container.getHttpHostAddress()) + .setIndexName(index) + .setBulkEnabled(true) + .setMaxRetries(1000) + .setBulkActions(2) + .setRetryBackoffInMs(100) + .setBulkFlushIntervalInMs(10000); + ElasticSearchClient client = new ElasticSearchClient(config); + client.createIndexIfNeeded(index); + + MockRecord mockRecord = new MockRecord<>(); + client.bulkIndex(mockRecord, Pair.of("1","{\"a\":1}")); + client.bulkIndex(mockRecord, Pair.of("2","{\"a\":2}")); + Thread.sleep(1000L); + assertEquals(mockRecord.acked, 2); + assertEquals(mockRecord.failed, 0); + assertEquals(client.totalHits(index), 2); + + ChaosContainer chaosContainer = new ChaosContainer<>(container.getContainerName(), "15s"); + chaosContainer.start(); + + client.bulkIndex(mockRecord, Pair.of("3","{\"a\":3}")); + Thread.sleep(5000L); + assertEquals(mockRecord.acked, 2); + assertEquals(mockRecord.failed, 0); + assertEquals(client.totalHits(index), 2); + + chaosContainer.stop(); + client.flush(); + assertEquals(mockRecord.acked, 3); + assertEquals(mockRecord.failed, 0); + assertEquals(client.totalHits(index), 3); } @Test - public void testBulkBlocking() throws Exception { - try (ElasticToxiproxiContainer toxiproxy = new ElasticToxiproxiContainer(container, network)) { - toxiproxy.start(); - - final String index = "indexblocking-" + UUID.randomUUID(); - ElasticSearchConfig config = new ElasticSearchConfig() - .setElasticSearchUrl("http://" + toxiproxy.getHttpHostAddress()) - .setIndexName(index) - .setBulkEnabled(true) - .setMaxRetries(1000) - .setBulkActions(2) - .setBulkConcurrentRequests(2) - .setRetryBackoffInMs(100) - .setBulkFlushIntervalInMs(10000); - try (ElasticSearchClient client = new ElasticSearchClient(config);) { - assertTrue(client.createIndexIfNeeded(index)); - - try { - MockRecord mockRecord = new MockRecord<>(); - for (int i = 1; i <= 5; i++) { - client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}")); - } - - Awaitility.await().untilAsserted(() -> { - assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4)); - assertEquals(mockRecord.failed, 0); - assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L)); - }); - client.flush(); - Awaitility.await().untilAsserted(() -> { - assertEquals(mockRecord.failed, 0); - assertEquals(mockRecord.acked, 5); - assertEquals(client.totalHits(index), 5); - }); - - log.info("starting the toxic"); - toxiproxy.getProxy().setConnectionCut(false); - toxiproxy.getProxy().toxics().latency("elasticpause", ToxicDirection.DOWNSTREAM, 35000); - toxiproxy.removeToxicAfterDelay("elasticpause", 30000); - - long start = System.currentTimeMillis(); - - // 11th bulkIndex is blocking because we have 2 pending requests, and the 3rd request is blocked. - for (int i = 6; i <= 15; i++) { - client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}")); - log.info("{} index {}", System.currentTimeMillis(), i); - } - long elapsed = System.currentTimeMillis() - start; - log.info("elapsed = {}", elapsed); - assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy - - Thread.sleep(1000L); - assertEquals(mockRecord.acked, 15); - assertEquals(mockRecord.failed, 0); - assertEquals(client.records.size(), 0); - - } finally { - client.delete(index); - } - } + public void testBlukBlocking() throws Exception { + final String index = "indexblocking"; + ElasticSearchConfig config = new ElasticSearchConfig() + .setElasticSearchUrl("http://"+container.getHttpHostAddress()) + .setIndexName(index) + .setBulkEnabled(true) + .setMaxRetries(1000) + .setBulkActions(2) + .setBulkConcurrentRequests(2) + .setRetryBackoffInMs(100) + .setBulkFlushIntervalInMs(10000); + ElasticSearchClient client = new ElasticSearchClient(config); + client.createIndexIfNeeded(index); + + MockRecord mockRecord = new MockRecord<>(); + for(int i = 1; i <= 5; i++) { + client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":"+i+"}")); } + Thread.sleep(1000L); + assertEquals(mockRecord.acked, 4); + assertEquals(mockRecord.failed, 0); + assertEquals(client.totalHits(index), 4); + Thread.sleep(10000L); // wait bulk flush interval + + assertEquals(mockRecord.acked, 5); + assertEquals(mockRecord.failed, 0); + assertEquals(client.totalHits(index), 5); + + ChaosContainer chaosContainer = new ChaosContainer<>(container.getContainerName(), "30s"); + chaosContainer.start(); + Thread.sleep(1000L); + + // 11th bulkIndex is blocking because we have 2 pending requests, and the 3rd request is blocked. + long start = System.currentTimeMillis(); + for(int i = 6; i <= 15; i++) { + client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":"+i+"}")); + System.out.println(String.format("%d index %d", System.currentTimeMillis(), i)); + } + long elaspe = System.currentTimeMillis() - start; + System.out.println("elapse=" + elaspe); + assertTrue(elaspe > 29000); // bulkIndex was blocking while elasticsearch was down or busy + + Thread.sleep(1000L); + assertEquals(mockRecord.acked, 15); + assertEquals(mockRecord.failed, 0); + assertEquals(client.records.size(), 0); + + chaosContainer.stop(); } + } diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java index 60fadffb0a8ac..f195a383d4073 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkRawDataTests.java @@ -48,7 +48,7 @@ public class ElasticSearchSinkRawDataTests { public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE")) - .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64"); + .orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64"); private static ElasticsearchContainer container; diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java index 625f812b76cf3..fbba6f22d6d27 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java @@ -52,7 +52,7 @@ public class ElasticSearchSinkTests { public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE")) - .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64"); + .orElse("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2-amd64"); private static ElasticsearchContainer container; diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ElasticToxiproxiContainer.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ElasticToxiproxiContainer.java deleted file mode 100644 index 348c350743a9c..0000000000000 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/testcontainers/ElasticToxiproxiContainer.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.io.elasticsearch.testcontainers; - -import lombok.extern.slf4j.Slf4j; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.ToxiproxyContainer; -import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testcontainers.utility.DockerImageName; - -import java.io.IOException; -import java.util.Objects; -import java.util.Timer; -import java.util.TimerTask; - -@Slf4j -// Toxiproxy container, which will be used as a TCP proxy -public class ElasticToxiproxiContainer extends ToxiproxyContainer { - - public static final DockerImageName TOXIPROXY_IMAGE = DockerImageName.parse("shopify/toxiproxy:2.1.4"); - - final ElasticsearchContainer container; - ToxiproxyContainer.ContainerProxy proxy; - - public ElasticToxiproxiContainer(ElasticsearchContainer container, Network network) { - super(TOXIPROXY_IMAGE); - this.withNetwork(network); - this.container = container; - } - - @Override - public void start() { - log.info("Starting toxiproxy container"); - super.start(); - proxy = this.getProxy(container, 9200); - } - - public String getHttpHostAddress() { - Objects.nonNull(proxy); - return proxy.getContainerIpAddress() + ":" + proxy.getProxyPort(); - } - - public ToxiproxyContainer.ContainerProxy getProxy() { - Objects.nonNull(proxy); - return proxy; - } - - public void removeToxicAfterDelay(String toxicName, long delayMs) { - Objects.nonNull(proxy); - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - try { - log.info("removing the toxic {}", toxicName); - proxy.toxics().get(toxicName).remove(); - } catch (IOException e) { - log.error("failed to remove toxic " + toxicName, e); - } - } - }, delayMs); - } - -}