Skip to content

Add auto extract timestamp parameter support #415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ Use the below schema to configure Splunk Connect for Kafka
"splunk.hec.raw": "<true|false>",
"splunk.hec.raw.line.breaker": "<line breaker separator>",
"splunk.hec.json.event.enrichment": "<key value pairs separated by comma, only applicable to /event HEC>",
"splunk.hec.auto.extract.timestamp": "<true|false>",
"value.converter": "<converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka>",
"value.converter.schema.registry.url": "<Schema-Registry-URL>",
"value.converter.schemas.enable": "<true|false>",
Expand Down Expand Up @@ -196,10 +197,11 @@ Use the below schema to configure Splunk Connect for Kafka
|-------- |----------------------------|-----------------------|
| `splunk.hec.raw.line.breaker` | Only applicable to /raw HEC endpoint. The setting is used to specify a custom line breaker to help Splunk separate the events correctly.</br> **NOTE:** <br/> For example, you can specify `"#####"` as a special line breaker. Internally, the Splunk Kafka Connector will append this line breaker to every Kafka record to form a clear event boundary. The connector performs data injection in batch mode. On the Splunk platform side, you can configure **`props.conf`** to set up line breaker for the sourcetypes. Then the Splunk software will correctly break events for data flowing through /raw HEC endpoint. For questions on how and when to specify line breaker, go to the FAQ section.|`""`|
##### /event endpoint only
| Name | Description | Default Value |
|-------- |----------------------------|-----------------------|
| `splunk.hec.json.event.enrichment` | Only applicable to /event HEC endpoint. This setting is used to enrich raw data with extra metadata fields. It contains a list of key value pairs separated by ",". The configured enrichment metadata will be indexed along with raw event data by Splunk software. </br> **NOTE:** <br/> Data enrichment for /event HEC endpoint is only available in Splunk Enterprise 6.5 and above. By default, this setting is empty. See ([Documentation](http://dev.splunk.com/view/event-collector/SP-CAAAE8Y#indexedfield)) for more information. <br/>**Example:** `org=fin,bu=south-east-us`||
| `splunk.hec.track.data` | When set to `true`, data loss and data injection latency metadata will be indexed along with raw data. This setting only works in conjunction with /event HEC endpoint (`"splunk.hec.raw" : "false"`). Valid settings are `true` or `false`. |`false`|
| Name | Description | Default Value |
|-------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| `splunk.hec.json.event.enrichment` | Only applicable to /event HEC endpoint. This setting is used to enrich raw data with extra metadata fields. It contains a list of key value pairs separated by ",". The configured enrichment metadata will be indexed along with raw event data by Splunk software. </br> **NOTE:** <br/> Data enrichment for /event HEC endpoint is only available in Splunk Enterprise 6.5 and above. By default, this setting is empty. See ([Documentation](http://dev.splunk.com/view/event-collector/SP-CAAAE8Y#indexedfield)) for more information. <br/>**Example:** `org=fin,bu=south-east-us` | |
| `splunk.hec.track.data` | When set to `true`, data loss and data injection latency metadata will be indexed along with raw data. This setting only works in conjunction with /event HEC endpoint (`"splunk.hec.raw" : "false"`). Valid settings are `true` or `false`. | `false` |
| `splunk.hec.auto.extract.timestamp` | When set to `true`, it forces Splunk HEC to extract the timestamp from the event envelope/event data. See [/services/collector/event](https://docs.splunk.com/Documentation/Splunk/9.1.1/RESTREF/RESTinput#services.2Fcollector.2Fevent) for more details. | `unset` |

### Headers Parameters
#### Use Headers
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/splunk/hecclient/HecConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public final class HecConfig {
private String kerberosPrincipal;
private String kerberosKeytabPath;
private int concurrentHecQueueCapacity = 100;
private Boolean autoExtractTimestamp;

public HecConfig(List<String> uris, String token) {
this.uris = uris;
Expand Down Expand Up @@ -114,6 +115,8 @@ public int getConcurrentHecQueueCapacity() {

public String getTrustStorePassword() { return trustStorePassword; }

public Boolean getAutoExtractTimestamp() { return autoExtractTimestamp; }

public HecConfig setDisableSSLCertVerification(boolean disableVerfication) {
disableSSLCertVerification = disableVerfication;
return this;
Expand Down Expand Up @@ -217,6 +220,11 @@ public HecConfig setConcurrentHecQueueCapacity(int concurrentHecQueueCapacity) {
return this;
}

public HecConfig setAutoExtractTimestamp(Boolean autoExtractTimestamp) {
this.autoExtractTimestamp = autoExtractTimestamp;
return this;
}

public boolean kerberosAuthEnabled() {
return !kerberosPrincipal().isEmpty();
}
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/com/splunk/hecclient/HecURIBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.splunk.hecclient;

import org.apache.http.client.utils.URIBuilder;

import java.net.URI;
import java.net.URISyntaxException;

public class HecURIBuilder {
public static final String AUTO_EXTRACT_TIMESTAMP_PARAMETER = "auto_extract_timestamp";

private final String baseUrl;
private final HecConfig hecConfig;

public HecURIBuilder(String baseUrl, HecConfig hecConfig) {
this.baseUrl = baseUrl;
this.hecConfig = hecConfig;
}

public URI getURI(String endpoint) {
try {
URIBuilder uriBuilder = new URIBuilder(baseUrl)
.setPath(endpoint);

if (hecConfig.getAutoExtractTimestamp() != null) {
uriBuilder.addParameter(AUTO_EXTRACT_TIMESTAMP_PARAMETER, hecConfig.getAutoExtractTimestamp().toString());
}
return uriBuilder.build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
}
11 changes: 9 additions & 2 deletions src/main/java/com/splunk/hecclient/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.splunk.kafka.connect.VersionUtils;
import com.sun.security.auth.module.Krb5LoginModule;

import java.net.URI;
import java.net.URISyntaxException;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.HashMap;
Expand All @@ -37,6 +40,7 @@
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
Expand All @@ -51,6 +55,8 @@ final class Indexer implements IndexerInf {
private static final ObjectMapper jsonMapper = new ObjectMapper();

private HecConfig hecConfig;

private HecURIBuilder hecURIBuilder;
private Configuration config;
private CloseableHttpClient httpClient;
private HttpContext context;
Expand All @@ -72,6 +78,7 @@ public Indexer(String baseUrl,CloseableHttpClient client,Poller poller,HecConfig
this.hecToken = config.getToken();
this.poller = poller;
this.context = HttpClientContext.create();
this.hecURIBuilder = new HecURIBuilder(baseUrl, hecConfig);
backPressure = 0;

channel = new HecChannel(this);
Expand Down Expand Up @@ -132,8 +139,8 @@ public HecChannel getChannel() {
@Override
public boolean send(final EventBatch batch) {
String endpoint = batch.getRestEndpoint();
String url = baseUrl + endpoint;
final HttpPost httpPost = new HttpPost(url);
URI uri = hecURIBuilder.getURI(endpoint);
final HttpPost httpPost = new HttpPost(uri);
httpPost.setHeaders(headers);
if (batch.isEnableCompression()) {
httpPost.setHeader("Content-Encoding", "gzip");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path";
static final String SSL_TRUSTSTORE_TYPE_CONF = "splunk.hec.ssl.trust.store.type";
static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password";
static final String AUTO_EXTRACT_TIMESTAMP_CONF = "splunk.hec.auto.extract.timestamp";

//Headers
static final String HEADER_SUPPORT_CONF = "splunk.header.support";
static final String HEADER_CUSTOM_CONF = "splunk.header.custom";
Expand Down Expand Up @@ -187,6 +189,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store.";
static final String SSL_TRUSTSTORE_TYPE_DOC = "Type of the trust store (JKS, PKCS12, ...).";
static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store.";
static final String AUTO_EXTRACT_TIMESTAMP_DOC = "When set to true, it forces Splunk HEC to extract the timestamp from event envelope/event data.";

static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override";
static final String HEADER_CUSTOM_DOC = "Setting will enable look for Record headers with these values and add them"
Expand Down Expand Up @@ -264,8 +267,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final String regex;
final String timestampFormat;
final int queueCapacity;

final String timeZone;
final Boolean autoExtractTimestamp;

SplunkSinkConnectorConfig(Map<String, String> taskConfig) {
super(conf(), taskConfig);
Expand Down Expand Up @@ -324,6 +327,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
validateRegexForTimestamp(regex);
queueCapacity = getInt(QUEUE_CAPACITY_CONF);
validateQueueCapacity(queueCapacity);
autoExtractTimestamp = getBoolean(AUTO_EXTRACT_TIMESTAMP_CONF);
}


Expand All @@ -341,6 +345,7 @@ public static ConfigDef conf() {
.define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
.define(SSL_TRUSTSTORE_TYPE_CONF, ConfigDef.Type.STRING, "JKS", ConfigDef.Importance.LOW, SSL_TRUSTSTORE_TYPE_DOC)
.define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC)
.define(AUTO_EXTRACT_TIMESTAMP_CONF, ConfigDef.Type.BOOLEAN, null, ConfigDef.Importance.LOW, AUTO_EXTRACT_TIMESTAMP_DOC)
.define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC)
.define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC)
.define(ACK_POLL_THREADS_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, ACK_POLL_THREADS_DOC)
Expand Down Expand Up @@ -398,7 +403,8 @@ public HecConfig getHecConfig() {
.setHasCustomTrustStore(hasTrustStorePath)
.setKerberosPrincipal(kerberosUserPrincipal)
.setKerberosKeytabPath(kerberosKeytabPath)
.setConcurrentHecQueueCapacity(queueCapacity);
.setConcurrentHecQueueCapacity(queueCapacity)
.setAutoExtractTimestamp(autoExtractTimestamp);
return config;
}

Expand Down
50 changes: 50 additions & 0 deletions src/test/java/com/splunk/hecclient/HecURIBuilderTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.splunk.hecclient;

import org.junit.Assert;
import org.junit.Test;

import java.net.URI;
import java.util.Collections;

import static com.splunk.hecclient.JsonEventBatch.ENDPOINT;

public class HecURIBuilderTest {
private static final String BASE_URL = "https://localhost:8088";
private static final String TOKEN = "mytoken";

@Test
public void testDefaultValues() {
HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN);
HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig);

URI uri = builder.getURI(ENDPOINT);

Assert.assertEquals("https://localhost:8088/services/collector/event", uri.toString());
}

@Test
public void testAutoExtractTimestamp() {
{
HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN)
.setAutoExtractTimestamp(true);
HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig);

URI uri = builder.getURI(ENDPOINT);

Assert.assertEquals("https://localhost:8088/services/collector/event?" +
HecURIBuilder.AUTO_EXTRACT_TIMESTAMP_PARAMETER + "=true",
uri.toString());
}
{
HecConfig hecConfig = new HecConfig(Collections.emptyList(), TOKEN)
.setAutoExtractTimestamp(false);
HecURIBuilder builder = new HecURIBuilder(BASE_URL, hecConfig);

URI uri = builder.getURI(ENDPOINT);

Assert.assertEquals("https://localhost:8088/services/collector/event?" +
HecURIBuilder.AUTO_EXTRACT_TIMESTAMP_PARAMETER + "=false",
uri.toString());
}
}
}
20 changes: 18 additions & 2 deletions src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.junit.Assert;
import org.junit.Test;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.*;

public class SplunkSinkTaskTest {
Expand Down Expand Up @@ -266,14 +269,27 @@ public void putWithRawAndAck() {

@Test
public void checkExtractedTimestamp() {


SplunkSinkTask task = new SplunkSinkTask();
Collection<SinkRecord> record = createSinkRecords(1,"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname1\",\"CLASS\":\"class1\",\"cust_id\":\"000013934\",\"time\": \"Jun 13 2010 23:11:52.454 UTC\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}");
UnitUtil uu = new UnitUtil(0);
Map<String, String> config = uu.createTaskConfig();
config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(false));
config.put(SplunkSinkConnectorConfig.ENABLE_TIMESTAMP_EXTRACTION_CONF, String.valueOf(true));
config.put(SplunkSinkConnectorConfig.REGEX_CONF, "\\\"time\\\":\\s*\\\"(?<time>.*?)\"");
config.put(SplunkSinkConnectorConfig.TIMESTAMP_FORMAT_CONF, "MMM dd yyyy HH:mm:ss.SSS zzz");
config.put(SplunkSinkConnectorConfig.TIMESTAMP_TIMEZONE_CONF, "UTC");

SimpleDateFormat df = new SimpleDateFormat(config.get(SplunkSinkConnectorConfig.TIMESTAMP_FORMAT_CONF));
df.setTimeZone(TimeZone.getTimeZone("UTC"));

double instantDouble = 1.276470712454E12;
String formattedDate = df.format(Date.from(Instant.ofEpochMilli(Double.valueOf(instantDouble).longValue())));

Collection<SinkRecord> record = createSinkRecords(1,"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname1\",\"CLASS\":\"class1\",\"cust_id\":\"000013934\",\"time\": \"" +
formattedDate +
"\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}");

HecMock hec = new HecMock(task);
hec.setSendReturnResult(HecMock.success);
task.setHec(hec);
Expand All @@ -286,7 +302,7 @@ public void checkExtractedTimestamp() {
List<Event> event_list = batch.getEvents();
Iterator<Event> iterator = event_list.listIterator() ;
Event event = iterator.next();
Assert.assertEquals(1.276470712454E9, event.getTime(), 0);
Assert.assertEquals(instantDouble / 1000, event.getTime(), 0);
break;
}
task.stop();
Expand Down