Skip to content

Commit a46a791

Browse files
authored
Merge branch 'develop' into feature/add-auto-extract-timestamp-parameter
2 parents 01fbcd3 + d690d35 commit a46a791

File tree

2 files changed

+34
-10
lines changed

2 files changed

+34
-10
lines changed

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -222,31 +222,31 @@ public String headerId(SinkRecord sinkRecord) {
222222

223223
StringBuilder headerString = new StringBuilder();
224224

225-
if(indexHeader != null) {
225+
if(indexHeader != null && indexHeader.value() != null) {
226226
headerString.append(indexHeader.value().toString());
227227
} else if (metas != null) {
228228
headerString.append(metas.get("index"));
229229
}
230230

231231
headerString.append(insertHeaderToken());
232232

233-
if(hostHeader != null) {
233+
if(hostHeader != null && hostHeader.value() != null) {
234234
headerString.append(hostHeader.value().toString());
235235
} else if (metas != null) {
236236
headerString.append("default-host");
237237
}
238238

239239
headerString.append(insertHeaderToken());
240240

241-
if(sourceHeader != null) {
241+
if(sourceHeader != null && sourceHeader.value() != null) {
242242
headerString.append(sourceHeader.value().toString());
243243
} else if (metas != null) {
244244
headerString.append(metas.get("source"));
245245
}
246246

247247
headerString.append(insertHeaderToken());
248248

249-
if(sourcetypeHeader != null) {
249+
if(sourcetypeHeader != null && sourcetypeHeader.value() != null) {
250250
headerString.append(sourcetypeHeader.value().toString());
251251
} else if (metas != null) {
252252
headerString.append(metas.get("sourcetype"));
@@ -442,16 +442,16 @@ private Event addHeaders(Event event, SinkRecord record) {
442442
Header headerSource = headers.lastWithName(connectorConfig.headerSource);
443443
Header headerSourcetype = headers.lastWithName(connectorConfig.headerSourcetype);
444444

445-
if (headerIndex != null) {
445+
if (headerIndex != null && headerIndex.value() != null) {
446446
event.setIndex(headerIndex.value().toString());
447447
}
448-
if (headerHost != null) {
448+
if (headerHost != null && headerHost.value() != null) {
449449
event.setHost(headerHost.value().toString());
450450
}
451-
if (headerSource != null) {
451+
if (headerSource != null && headerSource.value() != null) {
452452
event.setSource(headerSource.value().toString());
453453
}
454-
if (headerSourcetype != null) {
454+
if (headerSourcetype != null && headerSourcetype.value() != null) {
455455
event.setSourcetype(headerSourcetype.value().toString());
456456
}
457457

src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
import com.splunk.hecclient.Event;
1919
import com.splunk.hecclient.EventBatch;
20-
import com.splunk.hecclient.JsonEvent;
2120
import com.splunk.hecclient.RawEventBatch;
22-
import org.apache.commons.logging.Log;
2321
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2422
import org.apache.kafka.common.TopicPartition;
2523
import org.apache.kafka.common.config.ConfigException;
24+
import org.apache.kafka.common.header.Header;
2625
import org.apache.kafka.common.record.TimestampType;
2726
import org.apache.kafka.connect.errors.RetriableException;
2827
import org.apache.kafka.connect.sink.SinkRecord;
@@ -244,6 +243,25 @@ public void putWithNullEvent() {
244243
task.stop();
245244
}
246245

246+
@Test
247+
public void putWithNullIndexHeaderValue() {
248+
UnitUtil uu = new UnitUtil(0);
249+
Map<String, String> config = uu.createTaskConfig();
250+
config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(true));
251+
config.put(SplunkSinkConnectorConfig.ACK_CONF, String.valueOf(true));
252+
config.put(SplunkSinkConnectorConfig.MAX_BATCH_SIZE_CONF, String.valueOf(1));
253+
config.put(SplunkSinkConnectorConfig.HEADER_SUPPORT_CONF, String.valueOf("true"));
254+
config.put(SplunkSinkConnectorConfig.HEADER_INDEX_CONF, "index");
255+
SplunkSinkTask task = new SplunkSinkTask();
256+
HecMock hec = new HecMock(task);
257+
hec.setSendReturnResult(HecMock.success);
258+
task.setHec(hec);
259+
task.start(config);
260+
task.put(createSinkRecordWithNullIndexHeaderValue());
261+
Assert.assertEquals(1, hec.getBatches().size());
262+
task.stop();
263+
}
264+
247265
@Test
248266
public void putWithRawAndAck() {
249267
putWithSuccess(true, true);
@@ -532,6 +550,12 @@ private Collection<SinkRecord> createNullSinkRecord() {
532550
return records;
533551
}
534552

553+
private Collection<SinkRecord> createSinkRecordWithNullIndexHeaderValue() {
554+
List<SinkRecord> records = new ArrayList<>(createSinkRecords(1));
555+
records.get(0).headers().add("index", null, null);
556+
return records;
557+
}
558+
535559
private List<TopicPartition> createTopicPartitionList() {
536560
ArrayList<TopicPartition> tps = new ArrayList<>();
537561
tps.add(new TopicPartition("mytopic", 1));

0 commit comments

Comments
 (0)