-
Notifications
You must be signed in to change notification settings - Fork 438
Added Alias support and external resource feature changes #865
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 15.0.x
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
import java.io.File; | ||
import java.net.URI; | ||
import java.net.URISyntaxException; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
@@ -400,12 +401,58 @@ | |
private static final String DATA_STREAM_TIMESTAMP_DISPLAY = "Data Stream Timestamp Field"; | ||
private static final String DATA_STREAM_TIMESTAMP_DEFAULT = ""; | ||
|
||
// Resource mapping configs | ||
public static final String EXTERNAL_RESOURCE_USAGE_CONFIG = "external.resource.usage"; | ||
private static final String EXTERNAL_RESOURCE_USAGE_DOC = String.format( | ||
"The type of resource to write to. Valid options are %s, %s, %s, %s, and %s. " | ||
+ "This determines whether the connector will write to regular indices, data streams, " | ||
+ "index aliases, or data stream aliases. When set to %s, the connector will " | ||
+ "auto-create indices or data streams based on the topic name and datastream configurations", | ||
ExternalResourceUsage.INDEX, | ||
ExternalResourceUsage.DATASTREAM, | ||
ExternalResourceUsage.ALIAS_INDEX, | ||
ExternalResourceUsage.ALIAS_DATASTREAM, | ||
ExternalResourceUsage.DISABLED, | ||
ExternalResourceUsage.DISABLED | ||
); | ||
private static final String EXTERNAL_RESOURCE_USAGE_DISPLAY = "External Resource Usage"; | ||
private static final String EXTERNAL_RESOURCE_USAGE_DEFAULT = | ||
ExternalResourceUsage.DISABLED.name(); | ||
|
||
public static final String TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG = | ||
"topic.to.external.resource.mapping"; | ||
private static final String TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DOC = String.format( | ||
"A list of topic-to-resource mappings in the format 'topic:resource'. " | ||
+ "If specified, the connector will use the provided resource name " | ||
+ "(index, data stream, or alias) instead of the topic name for writing " | ||
+ "to Elasticsearch. The resource must exist in Elasticsearch before " | ||
+ "configuring the connector. The type of resource (index, data stream, " | ||
+ "or alias) is determined by the '%s' configuration.", | ||
EXTERNAL_RESOURCE_USAGE_CONFIG | ||
); | ||
private static final String TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DISPLAY = | ||
"Topic to External Resource Mapping"; | ||
private static final String TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DEFAULT = ""; | ||
|
||
// Error message constants for topic-to-resource mapping validation | ||
public static final String INVALID_MAPPING_FORMAT_ERROR = | ||
"Invalid topic-to-resource mapping format. Expected format: topic:resource"; | ||
|
||
public static final String DUPLICATE_TOPIC_MAPPING_ERROR_FORMAT = | ||
"Topic '%s' is mapped to multiple resources. " | ||
+ "Each topic must be mapped to exactly one resource."; | ||
|
||
public static final String DUPLICATE_RESOURCE_MAPPING_ERROR_FORMAT = | ||
"Resource '%s' is mapped from multiple topics. " | ||
+ "Each resource must be mapped to exactly one topic."; | ||
|
||
private final String[] kafkaTopics; | ||
|
||
private static final String CONNECTOR_GROUP = "Connector"; | ||
private static final String DATA_CONVERSION_GROUP = "Data Conversion"; | ||
private static final String PROXY_GROUP = "Proxy"; | ||
private static final String SSL_GROUP = "Security"; | ||
private static final String KERBEROS_GROUP = "Kerberos"; | ||
private static final String DATA_STREAM_GROUP = "Data Stream"; | ||
|
||
public enum BehaviorOnMalformedDoc { | ||
IGNORE, | ||
|
@@ -425,6 +472,14 @@ | |
NONE | ||
} | ||
|
||
public enum ExternalResourceUsage { | ||
INDEX, | ||
DATASTREAM, | ||
ALIAS_INDEX, | ||
ALIAS_DATASTREAM, | ||
DISABLED | ||
} | ||
|
||
public enum SecurityProtocol { | ||
PLAINTEXT, | ||
SSL | ||
|
@@ -442,7 +497,6 @@ | |
addProxyConfigs(configDef); | ||
addSslConfigs(configDef); | ||
addKerberosConfigs(configDef); | ||
addDataStreamConfigs(configDef); | ||
return configDef; | ||
} | ||
|
||
|
@@ -480,6 +534,71 @@ | |
++order, | ||
Width.SHORT, | ||
CONNECTION_PASSWORD_DISPLAY | ||
).define( | ||
EXTERNAL_RESOURCE_USAGE_CONFIG, | ||
Type.STRING, | ||
EXTERNAL_RESOURCE_USAGE_DEFAULT, | ||
new EnumRecommender<>(ExternalResourceUsage.class), | ||
Importance.HIGH, | ||
EXTERNAL_RESOURCE_USAGE_DOC, | ||
CONNECTOR_GROUP, | ||
++order, | ||
Width.SHORT, | ||
EXTERNAL_RESOURCE_USAGE_DISPLAY, | ||
new EnumRecommender<>(ExternalResourceUsage.class) | ||
).define( | ||
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, | ||
Type.LIST, | ||
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DEFAULT, | ||
Importance.HIGH, | ||
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DOC, | ||
CONNECTOR_GROUP, | ||
++order, | ||
Width.LONG, | ||
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_DISPLAY | ||
).define( | ||
DATA_STREAM_TYPE_CONFIG, | ||
Type.STRING, | ||
DATA_STREAM_TYPE_DEFAULT.name(), | ||
Importance.LOW, | ||
DATA_STREAM_TYPE_DOC, | ||
CONNECTOR_GROUP, | ||
++order, | ||
Width.SHORT, | ||
DATA_STREAM_TYPE_DISPLAY, | ||
new EnumRecommender<>(DataStreamType.class) | ||
).define( | ||
DATA_STREAM_DATASET_CONFIG, | ||
Type.STRING, | ||
DATA_STREAM_DATASET_DEFAULT, | ||
new DataStreamDatasetValidator(), | ||
Importance.LOW, | ||
DATA_STREAM_DATASET_DOC, | ||
CONNECTOR_GROUP, | ||
++order, | ||
Width.MEDIUM, | ||
DATA_STREAM_DATASET_DISPLAY | ||
).define( | ||
DATA_STREAM_NAMESPACE_CONFIG, | ||
Type.STRING, | ||
DATA_STREAM_NAMESPACE_DEFAULT, | ||
new DataStreamNamespaceValidator(), | ||
Importance.LOW, | ||
DATA_STREAM_NAMESPACE_DOC, | ||
CONNECTOR_GROUP, | ||
++order, | ||
Width.MEDIUM, | ||
DATA_STREAM_NAMESPACE_DISPLAY | ||
).define( | ||
DATA_STREAM_TIMESTAMP_CONFIG, | ||
Type.LIST, | ||
DATA_STREAM_TIMESTAMP_DEFAULT, | ||
Importance.LOW, | ||
DATA_STREAM_TIMESTAMP_DOC, | ||
CONNECTOR_GROUP, | ||
++order, | ||
Width.LONG, | ||
DATA_STREAM_TIMESTAMP_DISPLAY | ||
).define( | ||
BATCH_SIZE_CONFIG, | ||
Type.INT, | ||
|
@@ -840,59 +959,64 @@ | |
); | ||
} | ||
|
||
private static void addDataStreamConfigs(ConfigDef configDef) { | ||
int order = 0; | ||
configDef | ||
.define( | ||
DATA_STREAM_NAMESPACE_CONFIG, | ||
Type.STRING, | ||
DATA_STREAM_NAMESPACE_DEFAULT, | ||
new DataStreamNamespaceValidator(), | ||
Importance.LOW, | ||
DATA_STREAM_NAMESPACE_DOC, | ||
DATA_STREAM_GROUP, | ||
++order, | ||
Width.MEDIUM, | ||
DATA_STREAM_NAMESPACE_DISPLAY | ||
).define( | ||
DATA_STREAM_DATASET_CONFIG, | ||
Type.STRING, | ||
DATA_STREAM_DATASET_DEFAULT, | ||
new DataStreamDatasetValidator(), | ||
Importance.LOW, | ||
DATA_STREAM_DATASET_DOC, | ||
DATA_STREAM_GROUP, | ||
++order, | ||
Width.MEDIUM, | ||
DATA_STREAM_DATASET_DISPLAY | ||
).define( | ||
DATA_STREAM_TYPE_CONFIG, | ||
Type.STRING, | ||
DATA_STREAM_TYPE_DEFAULT.name(), | ||
Importance.LOW, | ||
DATA_STREAM_TYPE_DOC, | ||
DATA_STREAM_GROUP, | ||
++order, | ||
Width.SHORT, | ||
DATA_STREAM_TYPE_DISPLAY, | ||
new EnumRecommender<>(DataStreamType.class) | ||
).define( | ||
DATA_STREAM_TIMESTAMP_CONFIG, | ||
Type.LIST, | ||
DATA_STREAM_TIMESTAMP_DEFAULT, | ||
Importance.LOW, | ||
DATA_STREAM_TIMESTAMP_DOC, | ||
DATA_STREAM_GROUP, | ||
++order, | ||
Width.LONG, | ||
DATA_STREAM_TIMESTAMP_DISPLAY | ||
); | ||
} | ||
|
||
public static final ConfigDef CONFIG = baseConfigDef(); | ||
|
||
public ElasticsearchSinkConnectorConfig(Map<String, String> props) { | ||
super(CONFIG, props); | ||
this.kafkaTopics = getTopicArray(props); | ||
} | ||
|
||
private String[] getTopicArray(Map<?, ?> config) { | ||
Object obj = config.get("topics"); | ||
return obj == null ? new String[0] : ((String) obj).trim().split("\\s*,\\s*"); | ||
Check failure on line 971 in src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java
|
||
} | ||
|
||
/** | ||
* Parses and validates topic-to-resource mappings. | ||
* | ||
* @return Map of topic to resource names | ||
* @throws ConfigException if any mapping is invalid or has duplicates | ||
*/ | ||
public Map<String, String> getTopicToExternalResourceMap() { | ||
List<String> mappings = topicToExternalResourceMapping(); | ||
Map<String, String> topicToExternalResourceMap = new HashMap<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Initialise the map with |
||
Set<String> seenResources = new HashSet<>(); | ||
|
||
for (String mapping : mappings) { | ||
String[] parts = mapping.split(":"); | ||
if (parts.length != 2) { | ||
throw new ConfigException( | ||
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, | ||
mapping, | ||
INVALID_MAPPING_FORMAT_ERROR | ||
); | ||
} | ||
|
||
String topic = parts[0].trim(); | ||
String resource = parts[1].trim(); | ||
|
||
// Check for duplicate topic mappings | ||
if (topicToExternalResourceMap.containsKey(topic)) { | ||
throw new ConfigException( | ||
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, | ||
mapping, | ||
String.format(DUPLICATE_TOPIC_MAPPING_ERROR_FORMAT, topic) | ||
); | ||
} | ||
|
||
// Check for duplicate resource mappings (enforce 1:1) | ||
if (seenResources.contains(resource)) { | ||
throw new ConfigException( | ||
TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG, | ||
mapping, | ||
String.format(DUPLICATE_RESOURCE_MAPPING_ERROR_FORMAT, resource) | ||
); | ||
} | ||
|
||
topicToExternalResourceMap.put(topic, resource); | ||
seenResources.add(resource); | ||
} | ||
return topicToExternalResourceMap; | ||
} | ||
|
||
public boolean isAuthenticatedConnection() { | ||
|
@@ -903,7 +1027,21 @@ | |
return !getString(PROXY_HOST_CONFIG).isEmpty(); | ||
} | ||
|
||
/** | ||
* Determines if data streams are being used. Checks the external | ||
* resource usage approach first, then falls back to legacy data stream configs. | ||
* | ||
* @return true if data streams are configured, false otherwise | ||
*/ | ||
public boolean isDataStream() { | ||
// Check if using new external resource usage approach | ||
if (isExternalResourceUsageEnabled()) { | ||
ExternalResourceUsage usage = externalResourceUsage(); | ||
return usage == ExternalResourceUsage.DATASTREAM | ||
|| usage == ExternalResourceUsage.ALIAS_DATASTREAM; | ||
} | ||
|
||
// Legacy data stream check | ||
return !dataStreamType().toUpperCase().equals(DataStreamType.NONE.name()) | ||
&& !dataStreamDataset().isEmpty(); | ||
} | ||
|
@@ -1118,6 +1256,27 @@ | |
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase()); | ||
} | ||
|
||
public ExternalResourceUsage externalResourceUsage() { | ||
return ExternalResourceUsage.valueOf(getString(EXTERNAL_RESOURCE_USAGE_CONFIG).toUpperCase()); | ||
} | ||
|
||
/** | ||
* Checks if external resource usage is enabled. | ||
* | ||
* @return true if external resource usage is configured, false if DISABLED | ||
*/ | ||
public boolean isExternalResourceUsageEnabled() { | ||
return externalResourceUsage() != ExternalResourceUsage.DISABLED; | ||
} | ||
|
||
public List<String> topicToExternalResourceMapping() { | ||
return getList(TOPIC_TO_EXTERNAL_RESOURCE_MAPPING_CONFIG); | ||
} | ||
|
||
public String[] getKafkaTopics() { | ||
return this.kafkaTopics; | ||
} | ||
|
||
private static class DataStreamNamespaceValidator implements Validator { | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can ask for docs approval for these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added docs team for approval. Also added necessary screenshots in description