Skip to content

Commit

Permalink
Fix the issue that NPE is thrown when user sends null value. (linkedi…
Browse files Browse the repository at this point in the history
  • Loading branch information
becketqin authored Sep 22, 2017
1 parent 28cdb85 commit 44f19cb
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import com.linkedin.kafka.clients.producer.UUIDFactory;
import java.util.Collections;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;

Expand Down Expand Up @@ -65,10 +66,12 @@ public List<ProducerRecord<byte[], byte[]>> split(String topic,
byte[] key,
byte[] serializedRecord,
int maxSegmentSize) {

if (topic == null) {
throw new IllegalArgumentException("Topic cannot be empty for LiKafkaGenericMessageSplitter.");
}
if (serializedRecord == null) {
return Collections.singletonList(new ProducerRecord<>(topic, partition, timestamp, key, null));
}
// We allow message id to be null, but it is strongly recommended to pass in a message id.
UUID segmentMessageId = messageId == null ? _uuidFactory.createUuid() : messageId;
List<ProducerRecord<byte[], byte[]>> segments = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,39 +52,57 @@ public void tearDown() {
*/
@Test
public void testSend() throws IOException, InterruptedException {

Properties props = new Properties();
props.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
LiKafkaProducer<String, String> producer = createProducer(props);
final String tempTopic = "testTopic" + new Random().nextInt(1000000);

for (int i = 0; i < RECORD_COUNT; ++i) {
String value = Integer.toString(i);
producer.send(new ProducerRecord<>(tempTopic, value));
try (LiKafkaProducer<String, String> producer = createProducer(props)) {
for (int i = 0; i < RECORD_COUNT; ++i) {
String value = Integer.toString(i);
producer.send(new ProducerRecord<>(tempTopic, value));
}
}

// Drain the send queues
producer.close();

Properties consumerProps = new Properties();
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
LiKafkaConsumer<String, String> consumer = createConsumer(consumerProps);
consumer.subscribe(Collections.singleton(tempTopic));
int messageCount = 0;
BitSet counts = new BitSet(RECORD_COUNT);
long startMs = System.currentTimeMillis();
while (messageCount < RECORD_COUNT && System.currentTimeMillis() < startMs + 30000) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
int index = Integer.parseInt(record.value());
counts.set(index);
messageCount++;
try (LiKafkaConsumer<String, String> consumer = createConsumer(consumerProps)) {
consumer.subscribe(Collections.singleton(tempTopic));
BitSet counts = new BitSet(RECORD_COUNT);
long startMs = System.currentTimeMillis();
while (messageCount < RECORD_COUNT && System.currentTimeMillis() < startMs + 30000) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
int index = Integer.parseInt(record.value());
counts.set(index);
messageCount++;
}
}
}
consumer.close();
assertEquals(RECORD_COUNT, messageCount);
}

@Test
public void testNullValue() {
try (LiKafkaProducer<String, String> producer = createProducer(null)) {
producer.send(new ProducerRecord<>("testNullValue", "key", null));
}
Properties consumerProps = new Properties();
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

try (LiKafkaConsumer<String, String> consumer = createConsumer(consumerProps)) {
consumer.subscribe(Collections.singleton("testNullValue"));
long startMs = System.currentTimeMillis();
ConsumerRecords<String, String> records = ConsumerRecords.empty();
while (records.isEmpty() && System.currentTimeMillis() < startMs + 30000) {
records = consumer.poll(100);
}
assertEquals(1, records.count());
ConsumerRecord<String, String> record = records.iterator().next();
assertEquals("key", record.key());
assertNull(record.value());
}
}

/**
* This test produces test data into a temporary topic to a particular broker with a non-deseriable value
* verifies producer.send() will throw exception if and only if SKIP_RECORD_ON_SKIPPABLE_EXCEPTION_CONFIG is false
Expand Down Expand Up @@ -115,27 +133,28 @@ public void close() {

Properties props = getProducerProperties(null);
props.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
LiKafkaProducer<String, String> producer = new LiKafkaProducerImpl<>(props, stringSerializer, errorThrowingSerializer, null, null);
final String tempTopic = "testTopic" + new Random().nextInt(1000000);

producer.send(new ProducerRecord<>(tempTopic, "ErrorBytes"));
producer.send(new ProducerRecord<>(tempTopic, "value"));
producer.close();
try (LiKafkaProducer<String, String> producer =
new LiKafkaProducerImpl<>(props, stringSerializer, errorThrowingSerializer, null, null)) {
producer.send(new ProducerRecord<>(tempTopic, "ErrorBytes"));
producer.send(new ProducerRecord<>(tempTopic, "value"));
producer.close();
}

Properties consumerProps = new Properties();
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
LiKafkaConsumer<String, String> consumer = createConsumer(consumerProps);
consumer.subscribe(Collections.singleton(tempTopic));
int messageCount = 0;
long startMs = System.currentTimeMillis();
while (messageCount < 1 && System.currentTimeMillis() < startMs + 30000) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
assertEquals("value", record.value());
messageCount++;
try (LiKafkaConsumer<String, String> consumer = createConsumer(consumerProps)) {
consumer.subscribe(Collections.singleton(tempTopic));
long startMs = System.currentTimeMillis();
while (messageCount < 1 && System.currentTimeMillis() < startMs + 30000) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
assertEquals("value", record.value());
messageCount++;
}
}
}
consumer.close();
assertEquals(1, messageCount);
}

Expand Down

0 comments on commit 44f19cb

Please sign in to comment.