Skip to content

Commit

Permalink
Merge branch 'master' into dev-qunzhong-offload-policy-update
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun committed Jun 28, 2023
2 parents ceadea7 + 8469f58 commit 2e637ea
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 30 deletions.
1 change: 1 addition & 0 deletions .github/changes-filter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ docs:
- '.idea/**'
- 'deployment/**'
- 'wiki/**'
- 'pip/**'
tests:
- added|modified: '**/src/test/java/**/*.java'
need_owasp:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,29 @@
*/
package org.apache.pulsar.io.elasticsearch;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.security.CreateApiKeyRequest;
import co.elastic.clients.elasticsearch.security.CreateApiKeyResponse;
import co.elastic.clients.elasticsearch.security.GetTokenRequest;
import co.elastic.clients.elasticsearch.security.GetTokenResponse;
import co.elastic.clients.elasticsearch.security.get_token.AccessTokenGrantType;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestClient;
import org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;

@Slf4j
public abstract class ElasticSearchTestBase {

public static final String ELASTICSEARCH_8 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
.orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.3");

public static final String ELASTICSEARCH_7 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
Expand All @@ -54,17 +55,28 @@ public ElasticSearchTestBase(String elasticImageName) {
}

protected ElasticsearchContainer createElasticsearchContainer() {
ElasticsearchContainer elasticsearchContainer;
if (elasticImageName.equals(OPENSEARCH)) {
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH).asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
return new ElasticsearchContainer(dockerImageName)
elasticsearchContainer = new ElasticsearchContainer(dockerImageName)
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("bootstrap.memory_lock", "true")
.withEnv("plugins.security.disabled", "true");
} else {
elasticsearchContainer = new ElasticsearchContainer(elasticImageName)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("xpack.security.enabled", "false")
.withEnv("xpack.security.http.ssl.enabled", "false");
}
configureElasticContainer(elasticsearchContainer);
return elasticsearchContainer;
}

protected void configureElasticContainer(ElasticsearchContainer elasticContainer) {
if (getCompatibilityMode() != ElasticSearchConfig.CompatibilityMode.OPENSEARCH) {
elasticContainer.withEnv("ingest.geoip.downloader.enabled", "false");
}
return new ElasticsearchContainer(elasticImageName)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("xpack.security.enabled", "false")
.withEnv("xpack.security.http.ssl.enabled", "false");
elasticContainer.withLogConsumer(o -> log.info("elastic> {}", o.getUtf8String()));
}

protected ElasticSearchConfig.CompatibilityMode getCompatibilityMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.tests.integration.io.sinks;

import java.util.Optional;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticSearch7SinkTester extends ElasticSearchSinkTester {
Expand All @@ -32,8 +31,9 @@ public ElasticSearch7SinkTester(boolean schemaEnable) {
super(schemaEnable);
}


@Override
protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
protected ElasticsearchContainer createElasticContainer() {
return new ElasticsearchContainer(ELASTICSEARCH_7)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,20 @@
package org.apache.pulsar.tests.integration.io.sinks;

import java.util.Optional;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticSearch8SinkTester extends ElasticSearchSinkTester {

public static final String ELASTICSEARCH_8 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
.orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.3");


public ElasticSearch8SinkTester(boolean schemaEnable) {
super(schemaEnable);
}

@Override
protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
protected ElasticsearchContainer createElasticContainer() {
return new ElasticsearchContainer(ELASTICSEARCH_8)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("xpack.security.enabled", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@
package org.apache.pulsar.tests.integration.io.sinks;

import static org.testng.Assert.assertTrue;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
Expand All @@ -46,6 +44,7 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
Expand Down Expand Up @@ -100,6 +99,29 @@ public ElasticSearchSinkTester(boolean schemaEnable) {
}
}

@Override
protected final ElasticsearchContainer createSinkService(PulsarCluster cluster) {
ElasticsearchContainer elasticContainer = createElasticContainer();
configureElasticContainer(elasticContainer);
return elasticContainer;
}

protected void configureElasticContainer(ElasticsearchContainer elasticContainer) {
if (!isOpenSearch()) {
elasticContainer.withEnv("ingest.geoip.downloader.enabled", "false");
}

// allow disk to fill up beyond default 90% threshold
elasticContainer.withEnv("cluster.routing.allocation.disk.threshold_enabled", "false");

elasticContainer.withLogConsumer(o -> log.info("elastic> {}", o.getUtf8String()));
}

protected boolean isOpenSearch() {
return false;
}

protected abstract ElasticsearchContainer createElasticContainer();

@Override
public void prepareSink() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
*/
package org.apache.pulsar.tests.integration.io.sinks;

import static org.testng.Assert.assertTrue;
import java.util.Map;
import java.util.Optional;
import org.apache.http.HttpHost;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
Expand All @@ -31,10 +32,6 @@
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Map;

import static org.testng.Assert.assertTrue;

public class OpenSearchSinkTester extends ElasticSearchSinkTester {

public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
Expand All @@ -48,7 +45,7 @@ public OpenSearchSinkTester(boolean schemaEnable) {
}

@Override
protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
protected ElasticsearchContainer createElasticContainer() {
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
return new ElasticsearchContainer(dockerImageName)
Expand All @@ -57,6 +54,10 @@ protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
.withEnv("plugins.security.disabled", "true");
}

protected boolean isOpenSearch() {
return true;
}

@Override
public void prepareSink() throws Exception {
RestClientBuilder builder = RestClient.builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
.withEnv("journalSyncData", "false")
.withEnv("journalMaxGroupWaitMSec", "0")
.withEnv("clusterName", clusterName)
.withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", "0.95")
.withEnv("diskUsageThreshold", "0.99")
.withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97")
.withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize));
if (spec.bookkeeperEnvs != null) {
bookieContainer.withEnv(spec.bookkeeperEnvs);
Expand Down

0 comments on commit 2e637ea

Please sign in to comment.