Skip to content

Add a new config parameter for disabling splunk validations #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ Use the below schema to configure Splunk Connect for Kafka
| `splunk.sources` | Splunk event source metadata for Kafka topic data. The same configuration rules as indexes can be applied. If left unconfigured, the default source binds to the HEC token. | `""` |
| `splunk.sourcetypes` | Splunk event sourcetype metadata for Kafka topic data. The same configuration rules as indexes can be applied here. If left unconfigured, the default sourcetype binds to the HEC token. | `""` |
| `splunk.flush.window` | The interval in seconds at which the events from kafka connect will be flushed to Splunk. | `30` |
| `splunk.validation.disable` | Disable validating splunk configurations before creating task. | `false` |
| `splunk.hec.ssl.validate.certs` | Valid settings are `true` or `false`. Enables or disables HTTPS certification validation. |`true`|
| `splunk.hec.http.keepalive` | Valid settings are `true` or `false`. Enables or disables HTTP connection keep-alive. |`true`|
| `splunk.hec.max.http.connection.per.channel` | Controls how many HTTP connections will be created and cached in the HTTP pool for one HEC channel. |`2`|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ private static String[] split(String data, String sep) {

private void validateSplunkConfigurations(final Map<String, String> configs) throws ConfigException {
SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(configs);
if (connectorConfig.disableValidation) {
return;
}
String[] indexes = split(connectorConfig.indexes, ",");
if(indexes == null || indexes.length == 0) {
preparePayloadAndExecuteRequest(connectorConfig, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String SOURCE_CONF = "splunk.sources";
static final String SOURCETYPE_CONF = "splunk.sourcetypes";
static final String FLUSH_WINDOW_CONF = "splunk.flush.window";
static final String DISABLE_VALIDATION = "splunk.validation.disable";
static final String TOTAL_HEC_CHANNEL_CONF = "splunk.hec.total.channels";
static final String MAX_HTTP_CONNECTION_PER_CHANNEL_CONF = "splunk.hec.max.http.connection.per.channel";
static final String MAX_BATCH_SIZE_CONF = "splunk.hec.max.batch.size"; // record count
Expand Down Expand Up @@ -128,6 +129,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String SSL_VALIDATE_CERTIFICATES_DOC = "Valid settings are true or false. Enables or disables HTTPS "
+ "certification validation. By default, this is set to true.";
static final String ENABLE_COMPRESSSION_DOC = "Valid settings are true or false. Used for enable or disable gzip-compression. By default, this is set to false.";
static final String DISABLE_VALIDATION_DOC = "Disable validating splunk configurations before creating task.";
// Acknowledgement Parameters
// Use Ack
static final String ACK_DOC = "Valid settings are true or false. When set to true Splunk Connect for Kafka will "
Expand Down Expand Up @@ -218,6 +220,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final int socketTimeout;
final boolean validateCertificates;
final boolean enableCompression;
final boolean disableValidation;
final int lbPollInterval;

final boolean ack;
Expand Down Expand Up @@ -304,6 +307,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
kerberosUserPrincipal = getString(KERBEROS_USER_PRINCIPAL_CONF);
kerberosKeytabPath = getString(KERBEROS_KEYTAB_PATH_CONF);
enableCompression = getBoolean(ENABLE_COMPRESSSION_CONF);
disableValidation = getBoolean(DISABLE_VALIDATION);
enableTimestampExtraction = getBoolean(ENABLE_TIMESTAMP_EXTRACTION_CONF);
regex = getString(REGEX_CONF);
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
Expand Down Expand Up @@ -351,6 +355,7 @@ public static ConfigDef conf() {
.define(HEADER_HOST_CONF, ConfigDef.Type.STRING, "splunk.header.host", ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC)
.define(LB_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 120, ConfigDef.Importance.LOW, LB_POLL_INTERVAL_DOC)
.define(ENABLE_COMPRESSSION_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, ENABLE_COMPRESSSION_DOC)
.define(DISABLE_VALIDATION, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, DISABLE_VALIDATION_DOC)
.define(KERBEROS_USER_PRINCIPAL_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_USER_PRINCIPAL_DOC)
.define(KERBEROS_KEYTAB_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_KEYTAB_LOCATION_DOC)
.define(ENABLE_TIMESTAMP_EXTRACTION_CONF, ConfigDef.Type.BOOLEAN, false , ConfigDef.Importance.MEDIUM, ENABLE_TIMESTAMP_EXTRACTION_DOC)
Expand Down Expand Up @@ -425,6 +430,7 @@ public String toString() {
+ "headerSourcetype:" + headerSourcetype + ", "
+ "headerHost:" + headerHost + ", "
+ "enableCompression:" + enableCompression + ", "
+ "disableValidation:" + disableValidation + ", "
+ "lbPollInterval:" + lbPollInterval;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,32 @@ public void testValidSplunkConfigurations() {
Assertions.assertDoesNotThrow(()->connector.validate(configs));
}

@Test
public void testInvalidSplunkConfigurationsWithValidationDisabled() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("splunk.validation.disable", "true");
configs.put("topics", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
clientInstance.client.setResponse(CloseableHttpClientMock.exception);
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertDoesNotThrow(()->connector.validate(configs));
}

@Test
public void testInvalidSplunkConfigurationsWithValidationEnabled() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("splunk.validation.disable", "false");
configs.put("topics", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
clientInstance.client.setResponse(CloseableHttpClientMock.exception);
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}

private void addNecessaryConfigs(Map<String, String> configs) {
configs.put(URI_CONF, TEST_URI);
configs.put(TOKEN_CONF, "blah");
Expand Down