Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.testcontainers.utility.DockerImageName;

/**
* Base test class for Kafka ingestion tests
* Base test class for Kafka ingestion tests.
*/
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class)
public class KafkaIngestionBaseIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -135,6 +135,9 @@ protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
// set custom kafka consumer properties
.put("ingestion_source.param.fetch.min.bytes", 30000)
.put("ingestion_source.param.enable.auto.commit", false)
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public void testSegmentReplicationWithRemoteStore() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();
createIndexWithDefaultSettings(1, 1);

ensureYellowAndNoInitializingShards(indexName);
final String nodeB = internalCluster().startDataOnlyNode();
ensureGreen(indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.plugin.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -99,9 +98,10 @@ protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaS
Properties consumerProp = new Properties();
consumerProp.put("bootstrap.servers", config.getBootstrapServers());
consumerProp.put("client.id", clientId);
if (config.getAutoOffsetResetConfig() != null && !config.getAutoOffsetResetConfig().isEmpty()) {
consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffsetResetConfig());
}

logger.info("Kafka consumer properties for topic {}: {}", config.getTopic(), config.getConsumerConfigurations());
consumerProp.putAll(config.getConsumerConfigurations());

// TODO: why Class org.apache.kafka.common.serialization.StringDeserializer could not be found if set the deserializer as prop?
// consumerProp.put("key.deserializer",
// "org.apache.kafka.common.serialization.StringDeserializer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.core.util.ConfigurationUtils;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -18,21 +19,27 @@
public class KafkaSourceConfig {
private final String PROP_TOPIC = "topic";
private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";
// TODO: support pass any generic kafka configs
private final String PROP_AUTO_OFFSET_RESET = "auto.offset.reset";

private final String topic;
private final String bootstrapServers;
private final String autoOffsetResetConfig;

private final Map<String, Object> consumerConfigsMap;

/**
* Constructor
* Extracts and look for required and optional kafka consumer configurations.
* @param params the configuration parameters
*/
public KafkaSourceConfig(Map<String, Object> params) {
this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
this.autoOffsetResetConfig = ConfigurationUtils.readOptionalStringProperty(params, PROP_AUTO_OFFSET_RESET);
this.consumerConfigsMap = new HashMap<>(params);

// remove above configurations
consumerConfigsMap.remove(PROP_TOPIC);
consumerConfigsMap.remove(PROP_BOOTSTRAP_SERVERS);
}

/**
Expand Down Expand Up @@ -60,4 +67,8 @@ public String getBootstrapServers() {
public String getAutoOffsetResetConfig() {
return autoOffsetResetConfig;
}

public Map<String, Object> getConsumerConfigurations() {
return consumerConfigsMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

public class KafkaSourceConfigTests extends OpenSearchTestCase {

public void testConstructorAndGetters() {
public void testKafkaSourceConfig() {
Map<String, Object> params = new HashMap<>();
params.put("topic", "topic");
params.put("bootstrap_servers", "bootstrap");
params.put("fetch.min.bytes", 30000);
params.put("enable.auto.commit", false);

KafkaSourceConfig config = new KafkaSourceConfig(params);

Expand All @@ -29,5 +31,7 @@ public void testConstructorAndGetters() {
"bootstrap",
config.getBootstrapServers()
);
Assert.assertEquals("Incorrect fetch.min.bytes", 30000, config.getConsumerConfigurations().get("fetch.min.bytes"));
Assert.assertEquals("Incorrect enable.auto.commit", false, config.getConsumerConfigurations().get("enable.auto.commit"));
}
}
Loading