Skip to content

Commit 57467a5

Browse files
authored
Validate Splunk configuration before creating task (#365)
1 parent a2be62b commit 57467a5

File tree

8 files changed

+216
-5
lines changed

8 files changed

+216
-5
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
</properties>
2222

2323
<dependencies>
24+
2425
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
2526
<dependency>
2627
<groupId>com.fasterxml.jackson.core</groupId>
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.splunk.kafka.connect;
2+
3+
import org.apache.http.impl.client.CloseableHttpClient;
4+
5+
import com.splunk.hecclient.HecConfig;
6+
7+
public abstract class AbstractClientWrapper {
8+
abstract CloseableHttpClient getClient(HecConfig config);
9+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.splunk.kafka.connect;
2+
3+
import org.apache.http.impl.client.CloseableHttpClient;
4+
5+
import com.splunk.hecclient.Hec;
6+
import com.splunk.hecclient.HecConfig;
7+
8+
public class HecClientWrapper extends AbstractClientWrapper {
9+
10+
@Override
11+
CloseableHttpClient getClient(HecConfig config) {
12+
return Hec.createHttpClient(config);
13+
14+
}
15+
16+
17+
}

src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java

Lines changed: 102 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,49 @@
2020

2121
import java.util.function.Function;
2222
import java.util.stream.Collectors;
23+
2324
import org.apache.commons.lang3.StringUtils;
25+
import org.apache.http.Header;
26+
import org.apache.http.client.ClientProtocolException;
27+
import org.apache.http.client.methods.CloseableHttpResponse;
28+
import org.apache.http.client.methods.HttpPost;
29+
import org.apache.http.client.methods.HttpUriRequest;
30+
import org.apache.http.client.protocol.HttpClientContext;
31+
import org.apache.http.impl.client.CloseableHttpClient;
32+
import org.apache.http.message.BasicHeader;
33+
import org.apache.http.protocol.HttpContext;
34+
import org.apache.http.util.EntityUtils;
2435
import org.apache.kafka.common.config.Config;
2536
import org.apache.kafka.common.config.ConfigDef;
37+
import org.apache.kafka.common.config.ConfigException;
2638
import org.apache.kafka.common.config.ConfigValue;
2739
import org.apache.kafka.connect.connector.Task;
2840
import org.apache.kafka.connect.sink.SinkConnector;
2941

42+
import java.io.IOException;
3043
import java.util.ArrayList;
3144
import java.util.List;
3245
import java.util.Map;
3346

3447
import org.slf4j.Logger;
3548
import org.slf4j.LoggerFactory;
3649

50+
import com.splunk.hecclient.Event;
51+
import com.splunk.hecclient.EventBatch;
52+
import com.splunk.hecclient.JsonEvent;
53+
import com.splunk.hecclient.JsonEventBatch;
54+
3755
public final class SplunkSinkConnector extends SinkConnector {
3856
private static final Logger log = LoggerFactory.getLogger(SplunkSinkConnector.class);
3957
private Map<String, String> taskConfig;
4058
private Map<String, ConfigValue> values;
4159
private List<ConfigValue> validations;
60+
private AbstractClientWrapper abstractClientWrapper = new HecClientWrapper();
61+
62+
63+
public void setHecInstance(AbstractClientWrapper abstractClientWrapper) {
64+
this.abstractClientWrapper = abstractClientWrapper;
65+
}
4266

4367
@Override
4468
public void start(Map<String, String> taskConfig) {
@@ -76,14 +100,15 @@ public ConfigDef config() {
76100
return SplunkSinkConnectorConfig.conf();
77101
}
78102

79-
103+
80104
@Override
81105
public Config validate(final Map<String, String> connectorConfigs) {
82106
Config config = super.validate(connectorConfigs);
83107
validations = config.configValues();
84108
values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity()));
85109

86110
validateKerberosConfigs(connectorConfigs);
111+
validateSplunkConfigurations(connectorConfigs);
87112
return new Config(validations);
88113
}
89114

@@ -100,9 +125,9 @@ void validateKerberosConfigs(final Map<String, String> configs) {
100125
}
101126

102127
String errorMessage = String.format(
103-
"Either both or neither '%s' and '%s' must be set for Kerberos authentication. ",
104-
KERBEROS_KEYTAB_PATH_CONF,
105-
KERBEROS_USER_PRINCIPAL_CONF
128+
"Either both or neither '%s' and '%s' must be set for Kerberos authentication. ",
129+
KERBEROS_KEYTAB_PATH_CONF,
130+
KERBEROS_USER_PRINCIPAL_CONF
106131
);
107132
addErrorMessage(KERBEROS_KEYTAB_PATH_CONF, errorMessage);
108133
addErrorMessage(KERBEROS_USER_PRINCIPAL_CONF, errorMessage);
@@ -111,4 +136,76 @@ void validateKerberosConfigs(final Map<String, String> configs) {
111136
private void addErrorMessage(String property, String error) {
112137
values.get(property).addErrorMessage(error);
113138
}
114-
}
139+
140+
private static String[] split(String data, String sep) {
141+
if (data != null && !data.trim().isEmpty()) {
142+
return data.trim().split(sep);
143+
}
144+
return null;
145+
}
146+
147+
148+
private void validateSplunkConfigurations(final Map<String, String> configs) throws ConfigException {
149+
SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(configs);
150+
String[] indexes = split(connectorConfig.indexes, ",");
151+
if(indexes == null || indexes.length == 0) {
152+
preparePayloadAndExecuteRequest(connectorConfig, "");
153+
} else {
154+
for (String index : indexes) {
155+
preparePayloadAndExecuteRequest(connectorConfig, index);
156+
}
157+
}
158+
}
159+
160+
private void preparePayloadAndExecuteRequest(SplunkSinkConnectorConfig connectorConfig, String index) throws ConfigException {
161+
Header[] headers = new Header[]{new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken))};
162+
String endpoint = "/services/collector";
163+
String url = connectorConfig.splunkURI + endpoint;
164+
final HttpPost httpPost = new HttpPost(url);
165+
httpPost.setHeaders(headers);
166+
EventBatch batch = new JsonEventBatch();
167+
Event event = new JsonEvent("Splunk HEC Configuration Check", null);
168+
event.setIndex(index);
169+
event.setSource("kafka-connect");
170+
event.setSourcetype("kafka-connect");
171+
batch.add(event);
172+
httpPost.setEntity(batch.getHttpEntity());
173+
CloseableHttpClient httpClient = abstractClientWrapper.getClient(connectorConfig.getHecConfig());
174+
executeHttpRequest(httpPost, httpClient);
175+
}
176+
177+
178+
179+
private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient httpClient) throws ConfigException {
180+
CloseableHttpResponse resp = null;
181+
HttpContext context;
182+
context = HttpClientContext.create();
183+
try {
184+
resp = httpClient.execute(req, context);
185+
int status = resp.getStatusLine().getStatusCode();
186+
187+
String respPayload = EntityUtils.toString(resp.getEntity(), "utf-8");
188+
if (status > 299){
189+
throw new ConfigException(String.format("Bad splunk configurations with status code:%s response:%s",status,respPayload));
190+
}
191+
} catch (ClientProtocolException ex) {
192+
throw new ConfigException("Invalid splunk SSL configuration detected while validating configuration",ex);
193+
} catch (IOException ex) {
194+
throw new ConfigException("Invalid Splunk Configurations",ex);
195+
} catch (ConfigException ex) {
196+
throw ex;
197+
} catch (Exception ex) {
198+
throw new ConfigException("failed to process http payload",ex);
199+
} finally {
200+
try {
201+
if (resp!= null) {
202+
resp.close();
203+
}
204+
} catch (Exception ex) {
205+
throw new ConfigException("failed to close http response",ex);
206+
}
207+
}
208+
}
209+
210+
211+
}

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,10 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
308308
regex = getString(REGEX_CONF);
309309
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
310310
validateRegexForTimestamp(regex);
311+
311312
}
312313

314+
313315
public static ConfigDef conf() {
314316
return new ConfigDef()
315317
.define(TOKEN_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, TOKEN_DOC)

src/test/java/com/splunk/hecclient/CloseableHttpClientMock.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public class CloseableHttpClientMock extends CloseableHttpClient {
3131
public static final String serverBusy = "{\"text\":\"Server busy\",\"code\":1}";
3232
public static final String noDataError = "{\"text\":\"No data\",\"code\":5}";
3333
public static final String invalidDataFormat = "{\"text\":\"Invalid data format\",\"code\":6}";
34+
public static final String inValidToken = "{\"text\":\"Invalid token\",\"code\":4}";
35+
public static final String inValidIndex = "{\"text\":\"Incorrect index\",\"code\":4,\"invalid-event-number\":1}";
3436
public static final String exception = "excpetion";
3537

3638
private String resp = "";
@@ -49,6 +51,10 @@ protected CloseableHttpResponse doExecute(HttpHost target, HttpRequest request,
4951
return createResponse(resp, 503);
5052
} else if (resp.equals(noDataError)) {
5153
return createResponse(resp, 400);
54+
}else if (resp.equals(inValidToken)) {
55+
return createResponse(resp, 400);
56+
}else if (resp.equals(inValidIndex)) {
57+
return createResponse(resp, 400);
5258
} else {
5359
return createResponse(success, 201);
5460
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.splunk.kafka.connect;
2+
3+
import org.apache.http.impl.client.CloseableHttpClient;
4+
5+
import com.splunk.hecclient.CloseableHttpClientMock;
6+
import com.splunk.hecclient.Hec;
7+
import com.splunk.hecclient.HecConfig;
8+
9+
public class MockHecClientWrapper extends AbstractClientWrapper{
10+
public CloseableHttpClientMock client = new CloseableHttpClientMock();
11+
12+
@Override
13+
CloseableHttpClient getClient(HecConfig config) {
14+
// TODO Auto-generated method stub
15+
if (config==null){}
16+
17+
return client;
18+
}
19+
20+
}

src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,18 @@
2222
import static org.junit.Assert.assertFalse;
2323
import static org.junit.Assert.assertTrue;
2424

25+
import org.apache.http.impl.client.CloseableHttpClient;
2526
import org.apache.kafka.common.config.Config;
2627
import org.apache.kafka.common.config.ConfigDef;
28+
import org.apache.kafka.common.config.ConfigException;
2729
import org.apache.kafka.common.config.ConfigValue;
2830
import org.apache.kafka.connect.connector.Task;
2931
import org.apache.kafka.connect.sink.SinkConnector;
3032
import org.junit.Assert;
33+
import org.junit.jupiter.api.Assertions;
3134
import org.junit.jupiter.api.Test;
3235

36+
import com.splunk.hecclient.CloseableHttpClientMock;
3337

3438
import java.util.*;
3539

@@ -74,6 +78,10 @@ public void testValidKerberosBothEmpty() {
7478
final Map<String, String> configs = new HashMap<>();
7579
addNecessaryConfigs(configs);
7680
SinkConnector connector = new SplunkSinkConnector();
81+
configs.put("topics", "b");
82+
configs.put("splunk.indexes", "b");
83+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
84+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
7785
Config result = connector.validate(configs);
7886
assertNoErrors(result);
7987
}
@@ -85,6 +93,10 @@ public void testValidKerberosBothSet() {
8593
configs.put(KERBEROS_USER_PRINCIPAL_CONF, TEST_KERB_PRINCIPAL);
8694
configs.put(KERBEROS_KEYTAB_PATH_CONF, TEST_KERB_KEYTAB_PATH);
8795
SinkConnector connector = new SplunkSinkConnector();
96+
configs.put("topics", "b");
97+
configs.put("splunk.indexes", "b");
98+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
99+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
88100
Config result = connector.validate(configs);
89101
assertNoErrors(result);
90102
}
@@ -95,6 +107,10 @@ public void testInvalidKerberosOnlyPrincipalSet() {
95107
addNecessaryConfigs(configs);
96108
configs.put(KERBEROS_USER_PRINCIPAL_CONF, TEST_KERB_PRINCIPAL);
97109
SplunkSinkConnector connector = new SplunkSinkConnector();
110+
configs.put("topics", "b");
111+
configs.put("splunk.indexes", "b");
112+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
113+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
98114
Config result = connector.validate(configs);
99115
assertHasErrorMessage(result, KERBEROS_USER_PRINCIPAL_CONF, "must be set");
100116
assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set");
@@ -106,11 +122,54 @@ public void testInvalidKerberosOnlyKeytabSet() {
106122
addNecessaryConfigs(configs);
107123
configs.put(KERBEROS_KEYTAB_PATH_CONF, TEST_KERB_KEYTAB_PATH);
108124
SplunkSinkConnector connector = new SplunkSinkConnector();
125+
configs.put("topics", "b");
126+
configs.put("splunk.indexes", "b");
127+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
128+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
109129
Config result = connector.validate(configs);
110130
assertHasErrorMessage(result, KERBEROS_USER_PRINCIPAL_CONF, "must be set");
111131
assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set");
112132
}
113133

134+
@Test
135+
public void testInvalidToken() {
136+
final Map<String, String> configs = new HashMap<>();
137+
addNecessaryConfigs(configs);
138+
SplunkSinkConnector connector = new SplunkSinkConnector();
139+
configs.put("topics", "b");
140+
configs.put("splunk.indexes", "b");
141+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
142+
clientInstance.client.setResponse(CloseableHttpClientMock.inValidToken);
143+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
144+
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
145+
}
146+
147+
@Test
148+
public void testInvalidIndex() {
149+
final Map<String, String> configs = new HashMap<>();
150+
addNecessaryConfigs(configs);
151+
SplunkSinkConnector connector = new SplunkSinkConnector();
152+
configs.put("topics", "b");
153+
configs.put("splunk.indexes", "b");
154+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
155+
clientInstance.client.setResponse(CloseableHttpClientMock.inValidIndex);
156+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
157+
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
158+
}
159+
160+
@Test
161+
public void testValidSplunkConfigurations() {
162+
final Map<String, String> configs = new HashMap<>();
163+
addNecessaryConfigs(configs);
164+
SplunkSinkConnector connector = new SplunkSinkConnector();
165+
configs.put("topics", "b");
166+
configs.put("splunk.indexes", "b");
167+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
168+
clientInstance.client.setResponse(CloseableHttpClientMock.success);
169+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
170+
Assertions.assertDoesNotThrow(()->connector.validate(configs));
171+
}
172+
114173
private void addNecessaryConfigs(Map<String, String> configs) {
115174
configs.put(URI_CONF, TEST_URI);
116175
configs.put(TOKEN_CONF, "blah");

0 commit comments

Comments
 (0)