Skip to content

Commit 9c75901

Browse files
authored
Merge pull request #415 from ludovic-boutros/feature/add-auto-extract-timestamp-parameter
Add auto extract timestamp parameter support
2 parents d690d35 + a46a791 commit 9c75901

File tree

7 files changed

+131
-10
lines changed

7 files changed

+131
-10
lines changed

README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ Use the below schema to configure Splunk Connect for Kafka
107107
"splunk.hec.raw": "<true|false>",
108108
"splunk.hec.raw.line.breaker": "<line breaker separator>",
109109
"splunk.hec.json.event.enrichment": "<key value pairs separated by comma, only applicable to /event HEC>",
110+
"splunk.hec.auto.extract.timestamp": "<true|false>",
110111
"value.converter": "<converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka>",
111112
"value.converter.schema.registry.url": "<Schema-Registry-URL>",
112113
"value.converter.schemas.enable": "<true|false>",
@@ -196,10 +197,11 @@ Use the below schema to configure Splunk Connect for Kafka
196197
|-------- |----------------------------|-----------------------|
197198
| `splunk.hec.raw.line.breaker` | Only applicable to /raw HEC endpoint. The setting is used to specify a custom line breaker to help Splunk separate the events correctly.</br> **NOTE:** <br/> For example, you can specify `"#####"` as a special line breaker. Internally, the Splunk Kafka Connector will append this line breaker to every Kafka record to form a clear event boundary. The connector performs data injection in batch mode. On the Splunk platform side, you can configure **`props.conf`** to set up line breaker for the sourcetypes. Then the Splunk software will correctly break events for data flowing through /raw HEC endpoint. For questions on how and when to specify line breaker, go to the FAQ section.|`""`|
198199
##### /event endpoint only
199-
| Name | Description | Default Value |
200-
|-------- |----------------------------|-----------------------|
201-
| `splunk.hec.json.event.enrichment` | Only applicable to /event HEC endpoint. This setting is used to enrich raw data with extra metadata fields. It contains a list of key value pairs separated by ",". The configured enrichment metadata will be indexed along with raw event data by Splunk software. </br> **NOTE:** <br/> Data enrichment for /event HEC endpoint is only available in Splunk Enterprise 6.5 and above. By default, this setting is empty. See ([Documentation](http://dev.splunk.com/view/event-collector/SP-CAAAE8Y#indexedfield)) for more information. <br/>**Example:** `org=fin,bu=south-east-us`||
202-
| `splunk.hec.track.data` | When set to `true`, data loss and data injection latency metadata will be indexed along with raw data. This setting only works in conjunction with /event HEC endpoint (`"splunk.hec.raw" : "false"`). Valid settings are `true` or `false`. |`false`|
200+
| Name | Description | Default Value |
201+
|-------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
202+
| `splunk.hec.json.event.enrichment` | Only applicable to /event HEC endpoint. This setting is used to enrich raw data with extra metadata fields. It contains a list of key value pairs separated by ",". The configured enrichment metadata will be indexed along with raw event data by Splunk software. </br> **NOTE:** <br/> Data enrichment for /event HEC endpoint is only available in Splunk Enterprise 6.5 and above. By default, this setting is empty. See ([Documentation](http://dev.splunk.com/view/event-collector/SP-CAAAE8Y#indexedfield)) for more information. <br/>**Example:** `org=fin,bu=south-east-us` | |
203+
| `splunk.hec.track.data` | When set to `true`, data loss and data injection latency metadata will be indexed along with raw data. This setting only works in conjunction with /event HEC endpoint (`"splunk.hec.raw" : "false"`). Valid settings are `true` or `false`. | `false` |
204+
| `splunk.hec.auto.extract.timestamp` | When set to `true`, it forces Splunk HEC to extract the timestamp from the event envelope/event data. See [/services/collector/event](https://docs.splunk.com/Documentation/Splunk/9.1.1/RESTREF/RESTinput#services.2Fcollector.2Fevent) for more details. | `unset` |
203205

204206
### Headers Parameters
205207
#### Use Headers

src/main/java/com/splunk/hecclient/HecConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public final class HecConfig {
4040
private String kerberosPrincipal;
4141
private String kerberosKeytabPath;
4242
private int concurrentHecQueueCapacity = 100;
43+
private Boolean autoExtractTimestamp;
4344

4445
public HecConfig(List<String> uris, String token) {
4546
this.uris = uris;
@@ -114,6 +115,8 @@ public int getConcurrentHecQueueCapacity() {
114115

115116
public String getTrustStorePassword() { return trustStorePassword; }
116117

118+
public Boolean getAutoExtractTimestamp() { return autoExtractTimestamp; }
119+
117120
public HecConfig setDisableSSLCertVerification(boolean disableVerfication) {
118121
disableSSLCertVerification = disableVerfication;
119122
return this;
@@ -217,6 +220,11 @@ public HecConfig setConcurrentHecQueueCapacity(int concurrentHecQueueCapacity) {
217220
return this;
218221
}
219222

223+
public HecConfig setAutoExtractTimestamp(Boolean autoExtractTimestamp) {
224+
this.autoExtractTimestamp = autoExtractTimestamp;
225+
return this;
226+
}
227+
220228
public boolean kerberosAuthEnabled() {
221229
return !kerberosPrincipal().isEmpty();
222230
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.splunk.hecclient;
2+
3+
import org.apache.http.client.utils.URIBuilder;
4+
5+
import java.net.URI;
6+
import java.net.URISyntaxException;
7+
8+
public class HecURIBuilder {
9+
public static final String AUTO_EXTRACT_TIMESTAMP_PARAMETER = "auto_extract_timestamp";
10+
11+
private final String baseUrl;
12+
private final HecConfig hecConfig;
13+
14+
public HecURIBuilder(String baseUrl, HecConfig hecConfig) {
15+
this.baseUrl = baseUrl;
16+
this.hecConfig = hecConfig;
17+
}
18+
19+
public URI getURI(String endpoint) {
20+
try {
21+
URIBuilder uriBuilder = new URIBuilder(baseUrl)
22+
.setPath(endpoint);
23+
24+
if (hecConfig.getAutoExtractTimestamp() != null) {
25+
uriBuilder.addParameter(AUTO_EXTRACT_TIMESTAMP_PARAMETER, hecConfig.getAutoExtractTimestamp().toString());
26+
}
27+
return uriBuilder.build();
28+
} catch (URISyntaxException e) {
29+
throw new RuntimeException(e);
30+
}
31+
}
32+
}

src/main/java/com/splunk/hecclient/Indexer.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import com.fasterxml.jackson.databind.node.ObjectNode;
2121
import com.splunk.kafka.connect.VersionUtils;
2222
import com.sun.security.auth.module.Krb5LoginModule;
23+
24+
import java.net.URI;
25+
import java.net.URISyntaxException;
2326
import java.security.Principal;
2427
import java.security.PrivilegedAction;
2528
import java.util.HashMap;
@@ -37,6 +40,7 @@
3740
import org.apache.http.client.methods.HttpPost;
3841
import org.apache.http.client.methods.HttpUriRequest;
3942
import org.apache.http.client.protocol.HttpClientContext;
43+
import org.apache.http.client.utils.URIBuilder;
4044
import org.apache.http.impl.client.CloseableHttpClient;
4145
import org.apache.http.message.BasicHeader;
4246
import org.apache.http.protocol.HttpContext;
@@ -51,6 +55,8 @@ final class Indexer implements IndexerInf {
5155
private static final ObjectMapper jsonMapper = new ObjectMapper();
5256

5357
private HecConfig hecConfig;
58+
59+
private HecURIBuilder hecURIBuilder;
5460
private Configuration config;
5561
private CloseableHttpClient httpClient;
5662
private HttpContext context;
@@ -72,6 +78,7 @@ public Indexer(String baseUrl,CloseableHttpClient client,Poller poller,HecConfig
7278
this.hecToken = config.getToken();
7379
this.poller = poller;
7480
this.context = HttpClientContext.create();
81+
this.hecURIBuilder = new HecURIBuilder(baseUrl, hecConfig);
7582
backPressure = 0;
7683

7784
channel = new HecChannel(this);
@@ -132,8 +139,8 @@ public HecChannel getChannel() {
132139
@Override
133140
public boolean send(final EventBatch batch) {
134141
String endpoint = batch.getRestEndpoint();
135-
String url = baseUrl + endpoint;
136-
final HttpPost httpPost = new HttpPost(url);
142+
URI uri = hecURIBuilder.getURI(endpoint);
143+
final HttpPost httpPost = new HttpPost(uri);
137144
httpPost.setHeaders(headers);
138145
if (batch.isEnableCompression()) {
139146
httpPost.setHeader("Content-Encoding", "gzip");

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
8080
static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path";
8181
static final String SSL_TRUSTSTORE_TYPE_CONF = "splunk.hec.ssl.trust.store.type";
8282
static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password";
83+
static final String AUTO_EXTRACT_TIMESTAMP_CONF = "splunk.hec.auto.extract.timestamp";
84+
8385
//Headers
8486
static final String HEADER_SUPPORT_CONF = "splunk.header.support";
8587
static final String HEADER_CUSTOM_CONF = "splunk.header.custom";
@@ -187,6 +189,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
187189
static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store.";
188190
static final String SSL_TRUSTSTORE_TYPE_DOC = "Type of the trust store (JKS, PKCS12, ...).";
189191
static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store.";
192+
static final String AUTO_EXTRACT_TIMESTAMP_DOC = "When set to true, it forces Splunk HEC to extract the timestamp from event envelope/event data.";
190193

191194
static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override";
192195
static final String HEADER_CUSTOM_DOC = "Setting will enable look for Record headers with these values and add them"
@@ -264,8 +267,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
264267
final String regex;
265268
final String timestampFormat;
266269
final int queueCapacity;
267-
268270
final String timeZone;
271+
final Boolean autoExtractTimestamp;
269272

270273
SplunkSinkConnectorConfig(Map<String, String> taskConfig) {
271274
super(conf(), taskConfig);
@@ -324,6 +327,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
324327
validateRegexForTimestamp(regex);
325328
queueCapacity = getInt(QUEUE_CAPACITY_CONF);
326329
validateQueueCapacity(queueCapacity);
330+
autoExtractTimestamp = getBoolean(AUTO_EXTRACT_TIMESTAMP_CONF);
327331
}
328332

329333

@@ -341,6 +345,7 @@ public static ConfigDef conf() {
341345
.define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
342346
.define(SSL_TRUSTSTORE_TYPE_CONF, ConfigDef.Type.STRING, "JKS", ConfigDef.Importance.LOW, SSL_TRUSTSTORE_TYPE_DOC)
343347
.define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC)
348+
.define(AUTO_EXTRACT_TIMESTAMP_CONF, ConfigDef.Type.BOOLEAN, null, ConfigDef.Importance.LOW, AUTO_EXTRACT_TIMESTAMP_DOC)
344349
.define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC)
345350
.define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC)
346351
.define(ACK_POLL_THREADS_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, ACK_POLL_THREADS_DOC)
@@ -398,7 +403,8 @@ public HecConfig getHecConfig() {
398403
.setHasCustomTrustStore(hasTrustStorePath)
399404
.setKerberosPrincipal(kerberosUserPrincipal)
400405
.setKerberosKeytabPath(kerberosKeytabPath)
401-
.setConcurrentHecQueueCapacity(queueCapacity);
406+
.setConcurrentHecQueueCapacity(queueCapacity)
407+
.setAutoExtractTimestamp(autoExtractTimestamp);
402408
return config;
403409
}
404410

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.splunk.hecclient;
2+
3+
import org.junit.Assert;
4+
import org.junit.Test;
5+
6+
import java.net.URI;
7+
import java.util.Collections;
8+
9+
import static com.splunk.hecclient.JsonEventBatch.ENDPOINT;
10+
11+
public class HecURIBuilderTest {
12+
private static final String BASE_URL = "https://localhost:8088";
13+
private static final String TOKEN = "mytoken";
14+
15+
@Test
16+
public void testDefaultValues() {
17+
HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN);
18+
HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig);
19+
20+
URI uri = builder.getURI(ENDPOINT);
21+
22+
Assert.assertEquals("https://localhost:8088/services/collector/event", uri.toString());
23+
}
24+
25+
@Test
26+
public void testAutoExtractTimestamp() {
27+
{
28+
HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN)
29+
.setAutoExtractTimestamp(true);
30+
HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig);
31+
32+
URI uri = builder.getURI(ENDPOINT);
33+
34+
Assert.assertEquals("https://localhost:8088/services/collector/event?" +
35+
HecURIBuilder.AUTO_EXTRACT_TIMESTAMP_PARAMETER + "=true",
36+
uri.toString());
37+
}
38+
{
39+
HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN)
40+
.setAutoExtractTimestamp(false);
41+
HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig);
42+
43+
URI uri = builder.getURI(ENDPOINT);
44+
45+
Assert.assertEquals("https://localhost:8088/services/collector/event?" +
46+
HecURIBuilder.AUTO_EXTRACT_TIMESTAMP_PARAMETER + "=false",
47+
uri.toString());
48+
}
49+
}
50+
}

src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
import org.junit.Assert;
2929
import org.junit.Test;
3030

31+
import java.text.ParseException;
32+
import java.text.SimpleDateFormat;
33+
import java.time.Instant;
3134
import java.util.*;
3235

3336
public class SplunkSinkTaskTest {
@@ -266,14 +269,27 @@ public void putWithRawAndAck() {
266269

267270
@Test
268271
public void checkExtractedTimestamp() {
272+
273+
269274
SplunkSinkTask task = new SplunkSinkTask();
270-
Collection<SinkRecord> record = createSinkRecords(1,"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname1\",\"CLASS\":\"class1\",\"cust_id\":\"000013934\",\"time\": \"Jun 13 2010 23:11:52.454 UTC\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}");
271275
UnitUtil uu = new UnitUtil(0);
272276
Map<String, String> config = uu.createTaskConfig();
273277
config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(false));
274278
config.put(SplunkSinkConnectorConfig.ENABLE_TIMESTAMP_EXTRACTION_CONF, String.valueOf(true));
275279
config.put(SplunkSinkConnectorConfig.REGEX_CONF, "\\\"time\\\":\\s*\\\"(?<time>.*?)\"");
276280
config.put(SplunkSinkConnectorConfig.TIMESTAMP_FORMAT_CONF, "MMM dd yyyy HH:mm:ss.SSS zzz");
281+
config.put(SplunkSinkConnectorConfig.TIMESTAMP_TIMEZONE_CONF, "UTC");
282+
283+
SimpleDateFormat df = new SimpleDateFormat(config.get(SplunkSinkConnectorConfig.TIMESTAMP_FORMAT_CONF));
284+
df.setTimeZone(TimeZone.getTimeZone("UTC"));
285+
286+
double instantDouble = 1.276470712454E12;
287+
String formattedDate = df.format(Date.from(Instant.ofEpochMilli(Double.valueOf(instantDouble).longValue())));
288+
289+
Collection<SinkRecord> record = createSinkRecords(1,"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname1\",\"CLASS\":\"class1\",\"cust_id\":\"000013934\",\"time\": \"" +
290+
formattedDate +
291+
"\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}");
292+
277293
HecMock hec = new HecMock(task);
278294
hec.setSendReturnResult(HecMock.success);
279295
task.setHec(hec);
@@ -286,7 +302,7 @@ public void checkExtractedTimestamp() {
286302
List<Event> event_list = batch.getEvents();
287303
Iterator<Event> iterator = event_list.listIterator() ;
288304
Event event = iterator.next();
289-
Assert.assertEquals(1.276470712454E9, event.getTime(), 0);
305+
Assert.assertEquals(instantDouble / 1000, event.getTime(), 0);
290306
break;
291307
}
292308
task.stop();

0 commit comments

Comments
 (0)