Skip to content

Commit 4528b17

Browse files
committed
Fixed potential NullPointerException for Kafka headers with null value.
In Connect, a message Header can be null, so additional null-checks are required when processing header values e.g. for the Splunk index, to avoid an NPE. Adds a failing -> passing test.
1 parent 9475fad commit 4528b17

File tree

2 files changed

+34
-11
lines changed

2 files changed

+34
-11
lines changed

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public String headerId(SinkRecord sinkRecord) {
222222

223223
StringBuilder headerString = new StringBuilder();
224224

225-
if(indexHeader != null) {
225+
if(indexHeader != null && indexHeader.value() != null) {
226226
headerString.append(indexHeader.value().toString());
227227
} else {
228228
if(metas != null) {
@@ -232,7 +232,7 @@ public String headerId(SinkRecord sinkRecord) {
232232

233233
headerString.append(insertHeaderToken());
234234

235-
if(hostHeader != null) {
235+
if(hostHeader != null && hostHeader.value() != null) {
236236
headerString.append(hostHeader.value().toString());
237237
} else {
238238
if(metas != null) {
@@ -242,7 +242,7 @@ public String headerId(SinkRecord sinkRecord) {
242242

243243
headerString.append(insertHeaderToken());
244244

245-
if(sourceHeader != null) {
245+
if(sourceHeader != null && sourceHeader.value() != null) {
246246
headerString.append(sourceHeader.value().toString());
247247
} else {
248248
if(metas != null) {
@@ -252,7 +252,7 @@ public String headerId(SinkRecord sinkRecord) {
252252

253253
headerString.append(insertHeaderToken());
254254

255-
if(sourcetypeHeader != null) {
255+
if(sourcetypeHeader != null && sourcetypeHeader.value() != null) {
256256
headerString.append(sourcetypeHeader.value().toString());
257257
} else {
258258
if(metas != null) {
@@ -450,16 +450,16 @@ private Event addHeaders(Event event, SinkRecord record) {
450450
Header headerSource = headers.lastWithName(connectorConfig.headerSource);
451451
Header headerSourcetype = headers.lastWithName(connectorConfig.headerSourcetype);
452452

453-
if (headerIndex != null) {
453+
if (headerIndex != null && headerIndex.value() != null) {
454454
event.setIndex(headerIndex.value().toString());
455455
}
456-
if (headerHost != null) {
456+
if (headerHost != null && headerHost.value() != null) {
457457
event.setHost(headerHost.value().toString());
458458
}
459-
if (headerSource != null) {
459+
if (headerSource != null && headerSource.value() != null) {
460460
event.setSource(headerSource.value().toString());
461461
}
462-
if (headerSourcetype != null) {
462+
if (headerSourcetype != null && headerSourcetype.value() != null) {
463463
event.setSourcetype(headerSourcetype.value().toString());
464464
}
465465

src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@
1717

1818
import com.splunk.hecclient.Event;
1919
import com.splunk.hecclient.EventBatch;
20-
import com.splunk.hecclient.JsonEvent;
2120
import com.splunk.hecclient.RawEventBatch;
22-
import org.apache.commons.logging.Log;
2321
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2422
import org.apache.kafka.common.TopicPartition;
2523
import org.apache.kafka.common.config.ConfigException;
24+
import org.apache.kafka.common.header.Header;
2625
import org.apache.kafka.common.record.TimestampType;
2726
import org.apache.kafka.connect.errors.RetriableException;
2827
import org.apache.kafka.connect.sink.SinkRecord;
2928
import org.junit.Assert;
3029
import org.junit.Test;
3130

32-
import java.text.ParseException;
3331
import java.util.*;
3432

3533
public class SplunkSinkTaskTest {
@@ -242,6 +240,25 @@ public void putWithNullEvent() {
242240
task.stop();
243241
}
244242

243+
@Test
244+
public void putWithNullIndexHeaderValue() {
245+
UnitUtil uu = new UnitUtil(0);
246+
Map<String, String> config = uu.createTaskConfig();
247+
config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(true));
248+
config.put(SplunkSinkConnectorConfig.ACK_CONF, String.valueOf(true));
249+
config.put(SplunkSinkConnectorConfig.MAX_BATCH_SIZE_CONF, String.valueOf(1));
250+
config.put(SplunkSinkConnectorConfig.HEADER_SUPPORT_CONF, String.valueOf("true"));
251+
config.put(SplunkSinkConnectorConfig.HEADER_INDEX_CONF, "index");
252+
SplunkSinkTask task = new SplunkSinkTask();
253+
HecMock hec = new HecMock(task);
254+
hec.setSendReturnResult(HecMock.success);
255+
task.setHec(hec);
256+
task.start(config);
257+
task.put(createSinkRecordWithNullIndexHeaderValue());
258+
Assert.assertEquals(1, hec.getBatches().size());
259+
task.stop();
260+
}
261+
245262
@Test
246263
public void putWithRawAndAck() {
247264
putWithSuccess(true, true);
@@ -455,6 +472,12 @@ private Collection<SinkRecord> createNullSinkRecord() {
455472
return records;
456473
}
457474

475+
private Collection<SinkRecord> createSinkRecordWithNullIndexHeaderValue() {
476+
List<SinkRecord> records = new ArrayList<>(createSinkRecords(1));
477+
records.get(0).headers().add("index", null, null);
478+
return records;
479+
}
480+
458481
private List<TopicPartition> createTopicPartitionList() {
459482
ArrayList<TopicPartition> tps = new ArrayList<>();
460483
tps.add(new TopicPartition("mytopic", 1));

0 commit comments

Comments
 (0)