Skip to content

Commit d690d35

Browse files
authored
Merge pull request #414 from javabrett/fix-header-npe
Fix potential NullPointerException for Kafka headers with null value
2 parents 4a2e368 + 4528b17 commit d690d35

File tree

2 files changed

+34
-11
lines changed

2 files changed

+34
-11
lines changed

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -222,31 +222,31 @@ public String headerId(SinkRecord sinkRecord) {
222222

223223
StringBuilder headerString = new StringBuilder();
224224

225-
if(indexHeader != null) {
225+
if(indexHeader != null && indexHeader.value() != null) {
226226
headerString.append(indexHeader.value().toString());
227227
} else if (metas != null) {
228228
headerString.append(metas.get("index"));
229229
}
230230

231231
headerString.append(insertHeaderToken());
232232

233-
if(hostHeader != null) {
233+
if(hostHeader != null && hostHeader.value() != null) {
234234
headerString.append(hostHeader.value().toString());
235235
} else if (metas != null) {
236236
headerString.append("default-host");
237237
}
238238

239239
headerString.append(insertHeaderToken());
240240

241-
if(sourceHeader != null) {
241+
if(sourceHeader != null && sourceHeader.value() != null) {
242242
headerString.append(sourceHeader.value().toString());
243243
} else if (metas != null) {
244244
headerString.append(metas.get("source"));
245245
}
246246

247247
headerString.append(insertHeaderToken());
248248

249-
if(sourcetypeHeader != null) {
249+
if(sourcetypeHeader != null && sourcetypeHeader.value() != null) {
250250
headerString.append(sourcetypeHeader.value().toString());
251251
} else if (metas != null) {
252252
headerString.append(metas.get("sourcetype"));
@@ -442,16 +442,16 @@ private Event addHeaders(Event event, SinkRecord record) {
442442
Header headerSource = headers.lastWithName(connectorConfig.headerSource);
443443
Header headerSourcetype = headers.lastWithName(connectorConfig.headerSourcetype);
444444

445-
if (headerIndex != null) {
445+
if (headerIndex != null && headerIndex.value() != null) {
446446
event.setIndex(headerIndex.value().toString());
447447
}
448-
if (headerHost != null) {
448+
if (headerHost != null && headerHost.value() != null) {
449449
event.setHost(headerHost.value().toString());
450450
}
451-
if (headerSource != null) {
451+
if (headerSource != null && headerSource.value() != null) {
452452
event.setSource(headerSource.value().toString());
453453
}
454-
if (headerSourcetype != null) {
454+
if (headerSourcetype != null && headerSourcetype.value() != null) {
455455
event.setSourcetype(headerSourcetype.value().toString());
456456
}
457457

src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@
1717

1818
import com.splunk.hecclient.Event;
1919
import com.splunk.hecclient.EventBatch;
20-
import com.splunk.hecclient.JsonEvent;
2120
import com.splunk.hecclient.RawEventBatch;
22-
import org.apache.commons.logging.Log;
2321
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2422
import org.apache.kafka.common.TopicPartition;
2523
import org.apache.kafka.common.config.ConfigException;
24+
import org.apache.kafka.common.header.Header;
2625
import org.apache.kafka.common.record.TimestampType;
2726
import org.apache.kafka.connect.errors.RetriableException;
2827
import org.apache.kafka.connect.sink.SinkRecord;
2928
import org.junit.Assert;
3029
import org.junit.Test;
3130

32-
import java.text.ParseException;
3331
import java.util.*;
3432

3533
public class SplunkSinkTaskTest {
@@ -242,6 +240,25 @@ public void putWithNullEvent() {
242240
task.stop();
243241
}
244242

243+
@Test
244+
public void putWithNullIndexHeaderValue() {
245+
UnitUtil uu = new UnitUtil(0);
246+
Map<String, String> config = uu.createTaskConfig();
247+
config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(true));
248+
config.put(SplunkSinkConnectorConfig.ACK_CONF, String.valueOf(true));
249+
config.put(SplunkSinkConnectorConfig.MAX_BATCH_SIZE_CONF, String.valueOf(1));
250+
config.put(SplunkSinkConnectorConfig.HEADER_SUPPORT_CONF, String.valueOf("true"));
251+
config.put(SplunkSinkConnectorConfig.HEADER_INDEX_CONF, "index");
252+
SplunkSinkTask task = new SplunkSinkTask();
253+
HecMock hec = new HecMock(task);
254+
hec.setSendReturnResult(HecMock.success);
255+
task.setHec(hec);
256+
task.start(config);
257+
task.put(createSinkRecordWithNullIndexHeaderValue());
258+
Assert.assertEquals(1, hec.getBatches().size());
259+
task.stop();
260+
}
261+
245262
@Test
246263
public void putWithRawAndAck() {
247264
putWithSuccess(true, true);
@@ -517,6 +534,12 @@ private Collection<SinkRecord> createNullSinkRecord() {
517534
return records;
518535
}
519536

537+
private Collection<SinkRecord> createSinkRecordWithNullIndexHeaderValue() {
538+
List<SinkRecord> records = new ArrayList<>(createSinkRecords(1));
539+
records.get(0).headers().add("index", null, null);
540+
return records;
541+
}
542+
520543
private List<TopicPartition> createTopicPartitionList() {
521544
ArrayList<TopicPartition> tps = new ArrayList<>();
522545
tps.add(new TopicPartition("mytopic", 1));

0 commit comments

Comments
 (0)