Skip to content

Added Alias support and external resource feature changes #865

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: 15.0.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
}
}

public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
public DocWriteRequest<?> convertRecord(SinkRecord record, String resourceName) {

Check warning on line 124 in src/main/java/io/confluent/connect/elasticsearch/DataConverter.java

View check run for this annotation

SonarQube-Confluent / kafka-connect-elasticsearch Sonarqube Results

src/main/java/io/confluent/connect/elasticsearch/DataConverter.java#L124

Rename this variable to not match a restricted identifier.
if (record.value() == null) {
switch (config.behaviorOnNullValues()) {
case IGNORE:
Expand Down Expand Up @@ -166,7 +166,7 @@

// delete
if (record.value() == null) {
return maybeAddExternalVersioning(new DeleteRequest(index).id(id), record);
return maybeAddExternalVersioning(new DeleteRequest(resourceName).id(id), record);
}

String payload = getPayload(record);
Expand All @@ -175,14 +175,14 @@
// index
switch (config.writeMethod()) {
case UPSERT:
return new UpdateRequest(index, id)
return new UpdateRequest(resourceName, id)
.doc(payload, XContentType.JSON)
.upsert(payload, XContentType.JSON)
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5));
case INSERT:
OpType opType = config.isDataStream() ? OpType.CREATE : OpType.INDEX;
IndexRequest req =
new IndexRequest(index).source(payload, XContentType.JSON).opType(opType);
new IndexRequest(resourceName).source(payload, XContentType.JSON).opType(opType);
if (config.useAutogeneratedIds()) {
return req;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,14 @@ public boolean createIndexOrDataStream(String name) {
/**
* Creates a mapping for the given index and schema.
*
* @param index the index to create the mapping for
* @param resourceName the resource to create the mapping for
* @param schema the schema to map
*/
public void createMapping(String index, Schema schema) {
PutMappingRequest request = new PutMappingRequest(index).source(Mapping.buildMapping(schema));
public void createMapping(String resourceName, Schema schema) {
PutMappingRequest request = new PutMappingRequest(resourceName)
.source(Mapping.buildMapping(schema));
callWithRetries(
String.format("create mapping for index %s with schema %s", index, schema),
String.format("create mapping for resource %s with schema %s", resourceName, schema),
() -> client.indices().putMapping(request, RequestOptions.DEFAULT)
);
}
Expand Down Expand Up @@ -305,11 +306,11 @@ public void waitForInFlightRequests() {

/**
* Checks whether the index already has a mapping or not.
* @param index the index to check
* @param resourceName the resource to check
* @return true if a mapping exists, false if it does not
*/
public boolean hasMapping(String index) {
MappingMetadata mapping = mapping(index);
public boolean hasMapping(String resourceName) {
MappingMetadata mapping = mapping(resourceName);
return mapping != null && mapping.sourceAsMap() != null && !mapping.sourceAsMap().isEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -400,12 +401,58 @@
private static final String DATA_STREAM_TIMESTAMP_DISPLAY = "Data Stream Timestamp Field";
private static final String DATA_STREAM_TIMESTAMP_DEFAULT = "";

// Resource mapping configs
public static final String EXTERNAL_RESOURCE_USAGE_CONFIG = "external.resource.usage";
private static final String EXTERNAL_RESOURCE_USAGE_DOC = String.format(
"The type of resource to write to. Valid options are %s, %s, %s, %s, and %s. "
+ "This determines whether the connector will write to regular indices, data streams, "
+ "index aliases, or data stream aliases. When set to %s, the connector will "
+ "auto-create indices or data streams based on the topic name and datastream configurations",
Comment on lines +406 to +410
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can ask for docs approval for these.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added docs team for approval. Also added necessary screenshots in description

ExternalResourceUsage.INDEX,
ExternalResourceUsage.DATASTREAM,
ExternalResourceUsage.ALIAS_INDEX,
ExternalResourceUsage.ALIAS_DATASTREAM,
ExternalResourceUsage.DISABLED,
ExternalResourceUsage.DISABLED
);
private static final String EXTERNAL_RESOURCE_USAGE_DISPLAY = "External Resource Usage";
private static final String EXTERNAL_RESOURCE_USAGE_DEFAULT =
ExternalResourceUsage.DISABLED.name();

public static final String TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG =
"topic.to.external.resource.mapping";
private static final String TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DOC = String.format(
"A list of topic-to-resource mappings in the format 'topic:resource'. "
+ "If specified, the connector will use the provided resource name "
+ "(index, data stream, or alias) instead of the topic name for writing "
+ "to Elasticsearch. The resource must exist in Elasticsearch before "
+ "configuring the connector. The type of resource (index, data stream, "
+ "or alias) is determined by the '%s' configuration.",
EXTERNAL_RESOURCE_USAGE_CONFIG
);
private static final String TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DISPLAY =
"Topic to External Resource Mapping";
private static final String TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DEFAULT = "";

// Error message constants for topic-to-resource mapping validation
public static final String INVALID_MAPPING_FORMAT_ERROR =
"Invalid topic-to-resource mapping format. Expected format: topic:resource";

public static final String DUPLICATE_TOPIC_MAPPING_ERROR_FORMAT =
"Topic '%s' is mapped to multiple resources. "
+ "Each topic must be mapped to exactly one resource.";

public static final String DUPLICATE_RESOURCE_MAPPING_ERROR_FORMAT =
"Resource '%s' is mapped from multiple topics. "
+ "Each resource must be mapped to exactly one topic.";

private final String[] kafkaTopics;

private static final String CONNECTOR_GROUP = "Connector";
private static final String DATA_CONVERSION_GROUP = "Data Conversion";
private static final String PROXY_GROUP = "Proxy";
private static final String SSL_GROUP = "Security";
private static final String KERBEROS_GROUP = "Kerberos";
private static final String DATA_STREAM_GROUP = "Data Stream";

public enum BehaviorOnMalformedDoc {
IGNORE,
Expand All @@ -425,6 +472,14 @@
NONE
}

public enum ExternalResourceUsage {
INDEX,
DATASTREAM,
ALIAS_INDEX,
ALIAS_DATASTREAM,
DISABLED
}

public enum SecurityProtocol {
PLAINTEXT,
SSL
Expand All @@ -442,7 +497,6 @@
addProxyConfigs(configDef);
addSslConfigs(configDef);
addKerberosConfigs(configDef);
addDataStreamConfigs(configDef);
return configDef;
}

Expand Down Expand Up @@ -480,6 +534,71 @@
++order,
Width.SHORT,
CONNECTION_PASSWORD_DISPLAY
).define(
EXTERNAL_RESOURCE_USAGE_CONFIG,
Type.STRING,
EXTERNAL_RESOURCE_USAGE_DEFAULT,
new EnumRecommender<>(ExternalResourceUsage.class),
Importance.HIGH,
EXTERNAL_RESOURCE_USAGE_DOC,
CONNECTOR_GROUP,
++order,
Width.SHORT,
EXTERNAL_RESOURCE_USAGE_DISPLAY,
new EnumRecommender<>(ExternalResourceUsage.class)
).define(
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG,
Type.LIST,
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DEFAULT,
Importance.HIGH,
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DOC,
CONNECTOR_GROUP,
++order,
Width.LONG,
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DISPLAY
).define(
DATA_STREAM_TYPE_CONFIG,
Type.STRING,
DATA_STREAM_TYPE_DEFAULT.name(),
Importance.LOW,
DATA_STREAM_TYPE_DOC,
CONNECTOR_GROUP,
++order,
Width.SHORT,
DATA_STREAM_TYPE_DISPLAY,
new EnumRecommender<>(DataStreamType.class)
).define(
DATA_STREAM_DATASET_CONFIG,
Type.STRING,
DATA_STREAM_DATASET_DEFAULT,
new DataStreamDatasetValidator(),
Importance.LOW,
DATA_STREAM_DATASET_DOC,
CONNECTOR_GROUP,
++order,
Width.MEDIUM,
DATA_STREAM_DATASET_DISPLAY
).define(
DATA_STREAM_NAMESPACE_CONFIG,
Type.STRING,
DATA_STREAM_NAMESPACE_DEFAULT,
new DataStreamNamespaceValidator(),
Importance.LOW,
DATA_STREAM_NAMESPACE_DOC,
CONNECTOR_GROUP,
++order,
Width.MEDIUM,
DATA_STREAM_NAMESPACE_DISPLAY
).define(
DATA_STREAM_TIMESTAMP_CONFIG,
Type.LIST,
DATA_STREAM_TIMESTAMP_DEFAULT,
Importance.LOW,
DATA_STREAM_TIMESTAMP_DOC,
CONNECTOR_GROUP,
++order,
Width.LONG,
DATA_STREAM_TIMESTAMP_DISPLAY
).define(
BATCH_SIZE_CONFIG,
Type.INT,
Expand Down Expand Up @@ -840,59 +959,64 @@
);
}

private static void addDataStreamConfigs(ConfigDef configDef) {
int order = 0;
configDef
.define(
DATA_STREAM_NAMESPACE_CONFIG,
Type.STRING,
DATA_STREAM_NAMESPACE_DEFAULT,
new DataStreamNamespaceValidator(),
Importance.LOW,
DATA_STREAM_NAMESPACE_DOC,
DATA_STREAM_GROUP,
++order,
Width.MEDIUM,
DATA_STREAM_NAMESPACE_DISPLAY
).define(
DATA_STREAM_DATASET_CONFIG,
Type.STRING,
DATA_STREAM_DATASET_DEFAULT,
new DataStreamDatasetValidator(),
Importance.LOW,
DATA_STREAM_DATASET_DOC,
DATA_STREAM_GROUP,
++order,
Width.MEDIUM,
DATA_STREAM_DATASET_DISPLAY
).define(
DATA_STREAM_TYPE_CONFIG,
Type.STRING,
DATA_STREAM_TYPE_DEFAULT.name(),
Importance.LOW,
DATA_STREAM_TYPE_DOC,
DATA_STREAM_GROUP,
++order,
Width.SHORT,
DATA_STREAM_TYPE_DISPLAY,
new EnumRecommender<>(DataStreamType.class)
).define(
DATA_STREAM_TIMESTAMP_CONFIG,
Type.LIST,
DATA_STREAM_TIMESTAMP_DEFAULT,
Importance.LOW,
DATA_STREAM_TIMESTAMP_DOC,
DATA_STREAM_GROUP,
++order,
Width.LONG,
DATA_STREAM_TIMESTAMP_DISPLAY
);
}

public static final ConfigDef CONFIG = baseConfigDef();

public ElasticsearchSinkConnectorConfig(Map<String, String> props) {
super(CONFIG, props);
this.kafkaTopics = getTopicArray(props);
}

private String[] getTopicArray(Map<?, ?> config) {
Object obj = config.get("topics");
return obj == null ? new String[0] : ((String) obj).trim().split("\\s*,\\s*");

Check failure on line 971 in src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

View check run for this annotation

SonarQube-Confluent / kafka-connect-elasticsearch Sonarqube Results

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java#L971

Make sure the regex used here, which is vulnerable to polynomial runtime due to backtracking, cannot lead to denial of service.
}

/**
* Parses and validates topic-to-resource mappings.
*
* @return Map of topic to resource names
* @throws ConfigException if any mapping is invalid or has duplicates
*/
public Map<String, String> getTopicToExternalResourceMap() {
List<String> mappings = topicToExternalResourceMapping();
Map<String, String> topicToExternalResourceMap = new HashMap<>();
Copy link
Member

@sp-gupta sp-gupta Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialise the map with topicToExternalResourceMapping() and use the map in loop

Set<String> seenResources = new HashSet<>();

for (String mapping : mappings) {
String[] parts = mapping.split(":");
if (parts.length != 2) {
throw new ConfigException(
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG,
mapping,
INVALID_MAPPING_FORMAT_ERROR
);
}

String topic = parts[0].trim();
String resource = parts[1].trim();

// Check for duplicate topic mappings
if (topicToExternalResourceMap.containsKey(topic)) {
throw new ConfigException(
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG,
mapping,
String.format(DUPLICATE_TOPIC_MAPPING_ERROR_FORMAT, topic)
);
}

// Check for duplicate resource mappings (enforce 1:1)
if (seenResources.contains(resource)) {
throw new ConfigException(
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG,
mapping,
String.format(DUPLICATE_RESOURCE_MAPPING_ERROR_FORMAT, resource)
);
}

topicToExternalResourceMap.put(topic, resource);
seenResources.add(resource);
}
return topicToExternalResourceMap;
}

public boolean isAuthenticatedConnection() {
Expand All @@ -903,7 +1027,21 @@
return !getString(PROXY_HOST_CONFIG).isEmpty();
}

/**
* Determines if data streams are being used. Checks the external
* resource usage approach first, then falls back to legacy data stream configs.
*
* @return true if data streams are configured, false otherwise
*/
public boolean isDataStream() {
// Check if using new external resource usage approach
if (isExternalResourceUsageEnabled()) {
ExternalResourceUsage usage = externalResourceUsage();
return usage == ExternalResourceUsage.DATASTREAM
|| usage == ExternalResourceUsage.ALIAS_DATASTREAM;
}

// Legacy data stream check
return !dataStreamType().toUpperCase().equals(DataStreamType.NONE.name())
&& !dataStreamDataset().isEmpty();
}
Expand Down Expand Up @@ -1118,6 +1256,27 @@
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
}

public ExternalResourceUsage externalResourceUsage() {
return ExternalResourceUsage.valueOf(getString(EXTERNAL_RESOURCE_USAGE_CONFIG).toUpperCase());
}

/**
* Checks if external resource usage is enabled.
*
* @return true if external resource usage is configured, false if DISABLED
*/
public boolean isExternalResourceUsageEnabled() {
return externalResourceUsage() != ExternalResourceUsage.DISABLED;
}

public List<String> topicToExternalResourceMapping() {
return getList(TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG);
}

public String[] getKafkaTopics() {
return this.kafkaTopics;
}

private static class DataStreamNamespaceValidator implements Validator {

@Override
Expand Down
Loading
Loading