Skip to content

Commit

Permalink
[fix][io] ElasticSearch sink: align null fields behaviour (apache#18577)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored and lifepuzzlefun committed Dec 9, 2022
1 parent bceecb3 commit 0112452
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 16 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ flexible messaging model and an intuitive client API.</description>
<hdfs-offload-version3>3.3.3</hdfs-offload-version3>
<json-smart.version>2.4.7</json-smart.version>
<opensearch.version>1.2.4</opensearch.version>
<elasticsearch-java.version>8.1.0</elasticsearch-java.version>
<elasticsearch-java.version>8.5.2</elasticsearch-java.version>
<trino.version>363</trino.version>
<scala.binary.version>2.13</scala.binary.version>
<debezium.version>1.9.7.Final</debezium.version>
Expand Down Expand Up @@ -236,7 +236,7 @@ flexible messaging model and an intuitive client API.</description>
<netty-reactive-streams.version>2.0.6</netty-reactive-streams.version>

<!-- test dependencies -->
<testcontainers.version>1.17.2</testcontainers.version>
<testcontainers.version>1.17.6</testcontainers.version>
<hamcrest.version>2.2</hamcrest.version>

<!-- Set docker-java.version to the version of docker-java used in Testcontainers -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Map;
Expand All @@ -53,8 +55,9 @@
public class ElasticSearchJavaRestClient extends RestClient {

private final ElasticsearchClient client;
private final ObjectMapper objectMapper = new ObjectMapper();

private final ObjectMapper objectMapper = new ObjectMapper()
.configure(SerializationFeature.INDENT_OUTPUT, false)
.setSerializationInclusion(JsonInclude.Include.ALWAYS);
private BulkProcessor bulkProcessor;
private ElasticsearchTransport transport;

Expand Down Expand Up @@ -87,8 +90,7 @@ public void onFailure(Node node) {
log.warn("Node host={} failed", node.getHost());
}
});
transport = new RestClientTransport(builder.build(),
new JacksonJsonpMapper());
transport = new RestClientTransport(builder.build(), new JacksonJsonpMapper(objectMapper));
client = new ElasticsearchClient(transport);
if (elasticSearchConfig.isBulkEnabled()) {
bulkProcessor = new ElasticBulkProcessor(elasticSearchConfig, client, bulkProcessorListener);
Expand Down Expand Up @@ -117,7 +119,7 @@ public boolean createIndex(String index) throws IOException {
.build();
try {
final CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest);
if ((createIndexResponse.acknowledged() != null && createIndexResponse.acknowledged())
if ((createIndexResponse.acknowledged())
&& createIndexResponse.shardsAcknowledged()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.testng.Assert.assertTrue;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -193,7 +194,7 @@ public void testIndexExists() throws IOException {
@Test
public void testTopicToIndexName() throws IOException {
try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig()
.setElasticSearchUrl("http://" + container.getHttpHostAddress())); ) {
.setElasticSearchUrl("http://" + container.getHttpHostAddress()));) {
assertEquals(client.topicToIndexName("data-ks1.table1"), "data-ks1.table1");
assertEquals(client.topicToIndexName("persistent://public/default/testesjson"), "testesjson");
assertEquals(client.topicToIndexName("default/testesjson"), "testesjson");
Expand All @@ -211,7 +212,7 @@ public void testTopicToIndexName() throws IOException {
public void testMalformedDocFails() throws Exception {
String index = "indexmalformed-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://"+container.getHttpHostAddress())
.setElasticSearchUrl("http://" + container.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setBulkFlushIntervalInMs(-1L)
Expand All @@ -235,7 +236,7 @@ public void testMalformedDocFails() throws Exception {
public void testMalformedDocIgnore() throws Exception {
String index = "indexmalformed2-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://"+container.getHttpHostAddress())
.setElasticSearchUrl("http://" + container.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setBulkFlushIntervalInMs(-1)
Expand Down Expand Up @@ -366,7 +367,7 @@ public void testBulkBlocking() throws Exception {
public void testBulkIndexAndDelete() throws Exception {
final String index = "indexbulktest-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://"+container.getHttpHostAddress())
.setElasticSearchUrl("http://" + container.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setBulkActions(10)
Expand All @@ -389,4 +390,30 @@ public void testBulkIndexAndDelete() throws Exception {
}
}

@Test
public void testIndexKeepNulls() throws Exception {
final String index = "indexnulls";
ElasticSearchConfig config = new ElasticSearchConfig()
.setElasticSearchUrl("http://" + container.getHttpHostAddress())
.setIndexName(index);

try (ElasticSearchClient client = new ElasticSearchClient(config)) {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.indexDocument(mockRecord, Pair.of("key0", "{\"a\":1,\"b\":null}"));
final Map<String, Object> sourceAsMap;
if (elasticImageName.equals(ELASTICSEARCH_8)) {
final ElasticSearchJavaRestClient restClient = (ElasticSearchJavaRestClient) client.getRestClient();
sourceAsMap =
restClient.search(index, "*:*").hits().hits().get(0).source();
} else {
final OpenSearchHighLevelRestClient restClient = (OpenSearchHighLevelRestClient) client.getRestClient();
sourceAsMap =
restClient.search(index, "*:*").getHits().getHits()[0].getSourceAsMap();
}
assertEquals(sourceAsMap.get("a"), 1);
assertTrue(sourceAsMap.containsKey("b"));
assertNull(sourceAsMap.get("b"));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
public abstract class ElasticSearchTestBase {

public static final String ELASTICSEARCH_8 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:8.1.0");
.orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");

public static final String ELASTICSEARCH_7 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");

public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
.orElse("opensearchproject/opensearch:1.2.4");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@
*/
package org.apache.pulsar.tests.integration.io.sinks;

import java.util.Optional;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticSearch7SinkTester extends ElasticSearchSinkTester {

public static final String ELASTICSEARCH_7 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");


public ElasticSearch7SinkTester(boolean schemaEnable) {
super(schemaEnable);
}

@Override
protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
return new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64")
return new ElasticsearchContainer(ELASTICSEARCH_7)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@
*/
package org.apache.pulsar.tests.integration.io.sinks;

import java.util.Optional;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticSearch8SinkTester extends ElasticSearchSinkTester {

public static final String ELASTICSEARCH_8 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");


public ElasticSearch8SinkTester(boolean schemaEnable) {
super(schemaEnable);
}

@Override
protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
return new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:8.1.0")
return new ElasticsearchContainer(ELASTICSEARCH_8)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("xpack.security.enabled", "false")
.withEnv("xpack.security.http.ssl.enabled", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.tests.integration.io.sinks;

import java.util.Optional;
import org.apache.http.HttpHost;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
Expand All @@ -36,6 +37,9 @@

public class OpenSearchSinkTester extends ElasticSearchSinkTester {

public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
.orElse("opensearchproject/opensearch:1.2.4");

private RestHighLevelClient elasticClient;


Expand All @@ -45,7 +49,7 @@ public OpenSearchSinkTester(boolean schemaEnable) {

@Override
protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
DockerImageName dockerImageName = DockerImageName.parse("opensearchproject/opensearch:1.2.4")
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
return new ElasticsearchContainer(dockerImageName)
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
Expand Down

0 comments on commit 0112452

Please sign in to comment.