Skip to content

Commit 74eec81

Browse files
authored
Merge pull request #57 from confluentinc/CC-26777
CC-26777: Cherry pick fix for potential NullPointerException for Kafka headers with null value.
2 parents 9984b81 + b55b125 commit 74eec81

File tree

2 files changed

+33
-8
lines changed

2 files changed

+33
-8
lines changed

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public String headerId(SinkRecord sinkRecord) {
212212

213213
StringBuilder headerString = new StringBuilder();
214214

215-
if(indexHeader != null) {
215+
if(indexHeader != null && indexHeader.value() != null) {
216216
headerString.append(indexHeader.value().toString());
217217
} else {
218218
if(metas != null) {
@@ -222,7 +222,7 @@ public String headerId(SinkRecord sinkRecord) {
222222

223223
headerString.append(insertHeaderToken());
224224

225-
if(hostHeader != null) {
225+
if(hostHeader != null && hostHeader.value() != null) {
226226
headerString.append(hostHeader.value().toString());
227227
} else {
228228
if(metas != null) {
@@ -232,7 +232,7 @@ public String headerId(SinkRecord sinkRecord) {
232232

233233
headerString.append(insertHeaderToken());
234234

235-
if(sourceHeader != null) {
235+
if(sourceHeader != null && sourceHeader.value() != null) {
236236
headerString.append(sourceHeader.value().toString());
237237
} else {
238238
if(metas != null) {
@@ -242,7 +242,7 @@ public String headerId(SinkRecord sinkRecord) {
242242

243243
headerString.append(insertHeaderToken());
244244

245-
if(sourcetypeHeader != null) {
245+
if(sourcetypeHeader != null && sourcetypeHeader.value() != null) {
246246
headerString.append(sourcetypeHeader.value().toString());
247247
} else {
248248
if(metas != null) {
@@ -438,16 +438,16 @@ private Event addHeaders(Event event, SinkRecord record) {
438438
Header headerSource = headers.lastWithName(connectorConfig.headerSource);
439439
Header headerSourcetype = headers.lastWithName(connectorConfig.headerSourcetype);
440440

441-
if (headerIndex != null) {
441+
if (headerIndex != null && headerIndex.value() != null) {
442442
event.setIndex(headerIndex.value().toString());
443443
}
444-
if (headerHost != null) {
444+
if (headerHost != null && headerHost.value() != null) {
445445
event.setHost(headerHost.value().toString());
446446
}
447-
if (headerSource != null) {
447+
if (headerSource != null && headerSource.value() != null) {
448448
event.setSource(headerSource.value().toString());
449449
}
450-
if (headerSourcetype != null) {
450+
if (headerSourcetype != null && headerSourcetype.value() != null) {
451451
event.setSourcetype(headerSourcetype.value().toString());
452452
}
453453

src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,25 @@ public void putWithNullEvent() {
241241
task.stop();
242242
}
243243

244+
@Test
245+
public void putWithNullIndexHeaderValue() {
246+
UnitUtil uu = new UnitUtil(0);
247+
Map<String, String> config = uu.createTaskConfig();
248+
config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(true));
249+
config.put(SplunkSinkConnectorConfig.ACK_CONF, String.valueOf(true));
250+
config.put(SplunkSinkConnectorConfig.MAX_BATCH_SIZE_CONF, String.valueOf(1));
251+
config.put(SplunkSinkConnectorConfig.HEADER_SUPPORT_CONF, String.valueOf("true"));
252+
config.put(SplunkSinkConnectorConfig.HEADER_INDEX_CONF, "index");
253+
SplunkSinkTask task = new SplunkSinkTask();
254+
HecMock hec = new HecMock(task);
255+
hec.setSendReturnResult(HecMock.success);
256+
task.setHec(hec);
257+
task.start(config);
258+
task.put(createSinkRecordWithNullIndexHeaderValue());
259+
Assert.assertEquals(1, hec.getBatches().size());
260+
task.stop();
261+
}
262+
244263
@Test
245264
public void putWithRawAndAck() {
246265
putWithSuccess(true, true);
@@ -341,6 +360,12 @@ private Collection<SinkRecord> createNullSinkRecord() {
341360
return records;
342361
}
343362

363+
private Collection<SinkRecord> createSinkRecordWithNullIndexHeaderValue() {
364+
List<SinkRecord> records = new ArrayList<>(createSinkRecords(1));
365+
records.get(0).headers().add("index", null, null);
366+
return records;
367+
}
368+
344369
private List<TopicPartition> createTopicPartitionList() {
345370
ArrayList<TopicPartition> tps = new ArrayList<>();
346371
tps.add(new TopicPartition("mytopic", 1));

0 commit comments

Comments
 (0)