From e97fe5b8ac376ecead0097b184f0f1436fce88e4 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 28 Jun 2023 21:10:05 +0300 Subject: [PATCH 1/2] [improve][test] Disable disk usage threshold & geoip download and enable logging for Elastic Testcontainers (#20671) --- .../elasticsearch/ElasticSearchTestBase.java | 32 ++++++++++----- .../io/sinks/ElasticSearch7SinkTester.java | 4 +- .../io/sinks/ElasticSearch8SinkTester.java | 5 +-- .../io/sinks/ElasticSearchSinkTester.java | 40 ++++++++++++++----- .../io/sinks/OpenSearchSinkTester.java | 13 +++--- .../integration/topologies/PulsarCluster.java | 2 + 6 files changed, 66 insertions(+), 30 deletions(-) diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java index 4c6fd020fa338..0f5a42051c7d1 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java @@ -18,10 +18,6 @@ */ package org.apache.pulsar.io.elasticsearch; -import java.io.IOException; -import java.util.Map; -import java.util.Optional; - import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.security.CreateApiKeyRequest; import co.elastic.clients.elasticsearch.security.CreateApiKeyResponse; @@ -29,6 +25,10 @@ import co.elastic.clients.elasticsearch.security.GetTokenResponse; import co.elastic.clients.elasticsearch.security.get_token.AccessTokenGrantType; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestClient; import org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient; import org.opensearch.client.Request; @@ -36,10 +36,11 @@ import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testcontainers.utility.DockerImageName; +@Slf4j public abstract class ElasticSearchTestBase { public static final String ELASTICSEARCH_8 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8")) - .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1"); + .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.3"); public static final String ELASTICSEARCH_7 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7")) .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7"); @@ -54,17 +55,28 @@ public ElasticSearchTestBase(String elasticImageName) { } protected ElasticsearchContainer createElasticsearchContainer() { + ElasticsearchContainer elasticsearchContainer; if (elasticImageName.equals(OPENSEARCH)) { DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH).asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); - return new ElasticsearchContainer(dockerImageName) + elasticsearchContainer = new ElasticsearchContainer(dockerImageName) .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m") .withEnv("bootstrap.memory_lock", "true") .withEnv("plugins.security.disabled", "true"); + } else { + elasticsearchContainer = new ElasticsearchContainer(elasticImageName) + .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m") + .withEnv("xpack.security.enabled", "false") + .withEnv("xpack.security.http.ssl.enabled", "false"); + } + configureElasticContainer(elasticsearchContainer); + return elasticsearchContainer; + } + + protected void configureElasticContainer(ElasticsearchContainer elasticContainer) { + if (getCompatibilityMode() != ElasticSearchConfig.CompatibilityMode.OPENSEARCH) { + elasticContainer.withEnv("ingest.geoip.downloader.enabled", "false"); } - return new ElasticsearchContainer(elasticImageName) - .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m") - .withEnv("xpack.security.enabled", "false") - .withEnv("xpack.security.http.ssl.enabled", "false"); + elasticContainer.withLogConsumer(o -> log.info("elastic> {}", o.getUtf8String())); } protected ElasticSearchConfig.CompatibilityMode getCompatibilityMode() { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java index 65b38c677bfc5..d99fcad252706 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java @@ -19,7 +19,6 @@ package org.apache.pulsar.tests.integration.io.sinks; import java.util.Optional; -import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.elasticsearch.ElasticsearchContainer; public class ElasticSearch7SinkTester extends ElasticSearchSinkTester { @@ -32,8 +31,9 @@ public ElasticSearch7SinkTester(boolean schemaEnable) { super(schemaEnable); } + @Override - protected ElasticsearchContainer createSinkService(PulsarCluster cluster) { + protected ElasticsearchContainer createElasticContainer() { return new ElasticsearchContainer(ELASTICSEARCH_7) .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m"); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java index bb52c4ff03fea..8e7617a82a5b9 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java @@ -19,13 +19,12 @@ package org.apache.pulsar.tests.integration.io.sinks; import java.util.Optional; -import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.elasticsearch.ElasticsearchContainer; public class ElasticSearch8SinkTester extends ElasticSearchSinkTester { public static final String ELASTICSEARCH_8 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8")) - .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1"); + .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.3"); public ElasticSearch8SinkTester(boolean schemaEnable) { @@ -33,7 +32,7 @@ public ElasticSearch8SinkTester(boolean schemaEnable) { } @Override - protected ElasticsearchContainer createSinkService(PulsarCluster cluster) { + protected ElasticsearchContainer createElasticContainer() { return new ElasticsearchContainer(ELASTICSEARCH_8) .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m") .withEnv("xpack.security.enabled", "false") diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java index 546dd1b9113ab..0784055d29003 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java @@ -19,15 +19,6 @@ package org.apache.pulsar.tests.integration.io.sinks; import static org.testng.Assert.assertTrue; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.core.SearchRequest; import co.elastic.clients.elasticsearch.core.SearchResponse; @@ -35,6 +26,13 @@ import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.rest_client.RestClientTransport; import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import lombok.AllArgsConstructor; import lombok.Cleanup; import lombok.Data; @@ -46,6 +44,7 @@ import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.awaitility.Awaitility; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; @@ -100,6 +99,29 @@ public ElasticSearchSinkTester(boolean schemaEnable) { } } + @Override + protected final ElasticsearchContainer createSinkService(PulsarCluster cluster) { + ElasticsearchContainer elasticContainer = createElasticContainer(); + configureElasticContainer(elasticContainer); + return elasticContainer; + } + + protected void configureElasticContainer(ElasticsearchContainer elasticContainer) { + if (!isOpenSearch()) { + elasticContainer.withEnv("ingest.geoip.downloader.enabled", "false"); + } + + // allow disk to fill up beyond default 90% threshold + elasticContainer.withEnv("cluster.routing.allocation.disk.threshold_enabled", "false"); + + elasticContainer.withLogConsumer(o -> log.info("elastic> {}", o.getUtf8String())); + } + + protected boolean isOpenSearch() { + return false; + } + + protected abstract ElasticsearchContainer createElasticContainer(); @Override public void prepareSink() throws Exception { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java index 1e10cc4189c1a..75f0fdac6f90c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java @@ -18,9 +18,10 @@ */ package org.apache.pulsar.tests.integration.io.sinks; +import static org.testng.Assert.assertTrue; +import java.util.Map; import java.util.Optional; import org.apache.http.HttpHost; -import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.awaitility.Awaitility; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; @@ -31,10 +32,6 @@ import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testcontainers.utility.DockerImageName; -import java.util.Map; - -import static org.testng.Assert.assertTrue; - public class OpenSearchSinkTester extends ElasticSearchSinkTester { public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE")) @@ -48,7 +45,7 @@ public OpenSearchSinkTester(boolean schemaEnable) { } @Override - protected ElasticsearchContainer createSinkService(PulsarCluster cluster) { + protected ElasticsearchContainer createElasticContainer() { DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH) .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); return new ElasticsearchContainer(dockerImageName) @@ -57,6 +54,10 @@ protected ElasticsearchContainer createSinkService(PulsarCluster cluster) { .withEnv("plugins.security.disabled", "true"); } + protected boolean isOpenSearch() { + return true; + } + @Override public void prepareSink() throws Exception { RestClientBuilder builder = RestClient.builder( diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index c4c7697e30fdd..9b4823f46d4cc 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -167,7 +167,9 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s .withEnv("journalSyncData", "false") .withEnv("journalMaxGroupWaitMSec", "0") .withEnv("clusterName", clusterName) + .withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", "0.95") .withEnv("diskUsageThreshold", "0.99") + .withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97") .withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize)); if (spec.bookkeeperEnvs != null) { bookieContainer.withEnv(spec.bookkeeperEnvs); From 8469f582411cd47787b1429bf7b148ab2135193c Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Thu, 29 Jun 2023 06:45:40 +0800 Subject: [PATCH 2/2] [improve][ci] Skip test for pip directory (#20672) --- .github/changes-filter.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/changes-filter.yaml b/.github/changes-filter.yaml index be6faa957887d..250ebf692f6e6 100644 --- a/.github/changes-filter.yaml +++ b/.github/changes-filter.yaml @@ -11,6 +11,7 @@ docs: - '.idea/**' - 'deployment/**' - 'wiki/**' + - 'pip/**' tests: - added|modified: '**/src/test/java/**/*.java' need_owasp: