Skip to content

Commit

Permalink
Issue#43 Handle exception in the message processing (linkedin#44)
Browse files Browse the repository at this point in the history
If no key is provided for a message, splitter should not set the key to UUID.
Handle errors during message processing in the ConsumerRecordProcessor.
1. Return all the messages before the exception.
2. Allow skipping messages with exceptions.
  • Loading branch information
becketqin authored Jun 15, 2017
1 parent 043f385 commit 388de2f
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.kafka.clients.annotations.InterfaceOrigin;
import com.linkedin.kafka.clients.largemessage.LargeMessageSegment;
import com.linkedin.kafka.clients.largemessage.errors.OffsetNotTrackedException;
import com.linkedin.kafka.clients.largemessage.errors.RecordProcessingException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand Down Expand Up @@ -203,6 +204,11 @@ public interface LiKafkaConsumer<K, V> extends Consumer<K, V> {
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
* session timeout, errors deserializing key/value
* pairs, or any new error cases in future versions)
* @throws RecordProcessingException
* When RecordProcessingException is thrown,
* user can choose to catch it and continue to poll
* if they want to skip the message that has thrown
* exception.
*/
@InterfaceOrigin.ApacheKafka
ConsumerRecords<K, V> poll(long timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class LiKafkaConsumerConfig extends AbstractConfig {
public static final String ENABLE_AUTO_COMMIT_CONFIG = ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG;
public static final String AUTO_OFFSET_RESET_CONFIG = ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
public static final String SKIP_RECORD_ON_EXCEPTION_CONFIG = "skip.record.on.exception";

private static final String MESSAGE_ASSEMBLER_BUFFER_CAPACITY_DOC = "The maximum number of bytes the message assembler " +
" uses to buffer the incomplete large message segments. The capacity is shared by messages from all the topics. " +
Expand Down Expand Up @@ -79,6 +80,12 @@ public class LiKafkaConsumerConfig extends AbstractConfig {
+ "the offset to the latest offset</li><li>none: throw exception to the consumer if no previous offset is found "
+ "for the consumer's group</li><li>anything else: throw exception to the consumer.</li></ul>";

private static final String SKIP_RECORD_ON_EXCEPTION_DOC = "Skip the record with RecordProcessingException. "
+ "This is to allow the users who do are willing to ignore the problematic record to continue consuming without "
+ "worrying about the error handling. By default it is set to false and an exception will be thrown if record "
+ "processing encounters an error. If user ignore the exception and call poll again, the error message will also "
+ "be skipped.";

static {
CONFIG = new ConfigDef()
.define(MESSAGE_ASSEMBLER_BUFFER_CAPACITY_CONFIG,
Expand Down Expand Up @@ -138,7 +145,12 @@ public class LiKafkaConsumerConfig extends AbstractConfig {
Type.STRING,
"none",
Importance.MEDIUM,
AUTO_OFFSET_RESET_DOC);
AUTO_OFFSET_RESET_DOC)
.define(SKIP_RECORD_ON_EXCEPTION_CONFIG,
Type.BOOLEAN,
"false",
Importance.LOW,
SKIP_RECORD_ON_EXCEPTION_DOC);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@

package com.linkedin.kafka.clients.consumer;

import com.linkedin.kafka.clients.largemessage.ConsumerRecordsProcessResult;
import com.linkedin.kafka.clients.largemessage.ConsumerRecordsProcessor;
import com.linkedin.kafka.clients.largemessage.DeliveredMessageOffsetTracker;
import com.linkedin.kafka.clients.largemessage.LargeMessageSegment;
import com.linkedin.kafka.clients.largemessage.MessageAssembler;
import com.linkedin.kafka.clients.largemessage.MessageAssemblerImpl;
import com.linkedin.kafka.clients.auditing.Auditor;
import com.linkedin.kafka.clients.largemessage.errors.ConsumerRecordsProcessingException;
import com.linkedin.kafka.clients.largemessage.errors.RecordProcessingException;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import java.util.Collections;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down Expand Up @@ -75,6 +77,7 @@ public class LiKafkaConsumerImpl<K, V> implements LiKafkaConsumer<K, V> {
private final long _autoCommitInterval;
private final OffsetResetStrategy _offsetResetStrategy;
private long _lastAutoCommitMs;
private ConsumerRecordsProcessResult<K, V> _lastProcessedResult;

public LiKafkaConsumerImpl(Properties props) {
this(new LiKafkaConsumerConfig(props), null, null, null, null);
Expand Down Expand Up @@ -117,7 +120,7 @@ private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
_kafkaConsumer = new KafkaConsumer<>(configs.configForVanillaConsumer(),
byteArrayDeserializer,
byteArrayDeserializer);
try {
try {

// Instantiate segment deserializer if needed.
Deserializer segmentDeserializer = largeMessageSegmentDeserializer != null ? largeMessageSegmentDeserializer :
Expand Down Expand Up @@ -148,16 +151,20 @@ private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
configs.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
vDeserializer.configure(configs.originals(), false);

// Get the skip record on exception config
boolean skipRecordOnException = configs.getBoolean(LiKafkaConsumerConfig.SKIP_RECORD_ON_EXCEPTION_CONFIG);

// Instantiate consumer record processor
_consumerRecordsProcessor = new ConsumerRecordsProcessor<>(assembler, kDeserializer, vDeserializer,
messageOffsetTracker, auditor);
messageOffsetTracker, auditor, skipRecordOnException);

// Instantiate consumer rebalance listener
_consumerRebalanceListener = new LiKafkaConsumerRebalanceListener<>(_consumerRecordsProcessor,
this, _autoCommitEnabled);

// Instantiate offset commit callback.
_offsetCommitCallback = new LiKafkaOffsetCommitCallback();
_lastProcessedResult = null;
} catch (Exception e) {
_kafkaConsumer.close();
throw e;
Expand Down Expand Up @@ -221,6 +228,11 @@ public void unsubscribe() {

@Override
public ConsumerRecords<K, V> poll(long timeout) {
if (_lastProcessedResult != null && _lastProcessedResult.exception() != null) {
ConsumerRecordsProcessingException e = _lastProcessedResult.exception();
_lastProcessedResult = null;
throw e;
}
long startMs = System.currentTimeMillis();
ConsumerRecords<K, V> processedRecords;
// We will keep polling until timeout.
Expand Down Expand Up @@ -261,7 +273,16 @@ public ConsumerRecords<K, V> poll(long timeout) {
}
}
}
processedRecords = _consumerRecordsProcessor.process(rawRecords);
_lastProcessedResult = _consumerRecordsProcessor.process(rawRecords);
processedRecords = _lastProcessedResult.consumerRecords();
// Clear the internal reference.
_lastProcessedResult.clearRecords();
// Rewind offset if there are processing exceptions.
if (_lastProcessedResult.exception() != null) {
for (Map.Entry<TopicPartition, Long> entry : _lastProcessedResult.resumeOffsets().entrySet()) {
_kafkaConsumer.seek(entry.getKey(), entry.getValue());
}
}
now = System.currentTimeMillis();
} while (processedRecords.isEmpty() && now < startMs + timeout);
return processedRecords;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.linkedin.kafka.clients.largemessage;

import com.linkedin.kafka.clients.largemessage.errors.ConsumerRecordsProcessingException;
import com.linkedin.kafka.clients.largemessage.errors.RecordProcessingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;


/**
* The process result of ConsumerRecords returned by the open source KafkaConsumer.
*
* It contains the following information:
* 1. The processed consumer records.
* 2. If there were exception in processing, the offsets to skip those problematic messages for each partition.
* 3. The the exception thrown by the last problematic partition. (We just need to throw an exception to the user).
*/
public class ConsumerRecordsProcessResult<K, V> {
private final Map<TopicPartition, Long> _resumeOffsets;
private final List<RecordProcessingException> _exceptions;
private Map<TopicPartition, List<ConsumerRecord<K, V>>> _processedRecords;

ConsumerRecordsProcessResult() {
_processedRecords = new HashMap<>();
_resumeOffsets = new HashMap<>();
_exceptions = new ArrayList<>();
}

void addRecord(TopicPartition tp, ConsumerRecord<K, V> record) {
// Only put record into map if it is not null
if (record != null) {
List<ConsumerRecord<K, V>> list = _processedRecords.computeIfAbsent(tp, k -> new ArrayList<>());
list.add(record);
}
}

void recordException(TopicPartition tp, long offset, RuntimeException e) {
_exceptions.add(new RecordProcessingException(tp, offset, e));
// The resume offset is the error offset + 1. i.e. if user ignore the exception thrown and poll again, the resuming
// offset should be this one.
_resumeOffsets.putIfAbsent(tp, offset + 1);
}

public void clearRecords() {
_processedRecords = null;
}

boolean hasError(TopicPartition tp) {
return resumeOffsets().containsKey(tp);
}

public ConsumerRecordsProcessingException exception() {
return _exceptions.isEmpty() ? null : new ConsumerRecordsProcessingException(_exceptions);
}

public ConsumerRecords<K, V> consumerRecords() {
return new ConsumerRecords<>(_processedRecords);
}

public Map<TopicPartition, Long> resumeOffsets() {
return _resumeOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import com.linkedin.kafka.clients.auditing.AuditType;
import com.linkedin.kafka.clients.auditing.Auditor;
import com.linkedin.kafka.clients.largemessage.errors.RecordProcessingException;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand All @@ -15,9 +17,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.linkedin.kafka.clients.largemessage.MessageAssembler.AssembleResult.INCOMPLETE_RESULT;
Expand All @@ -35,12 +35,14 @@ public class ConsumerRecordsProcessor<K, V> {
private final DeliveredMessageOffsetTracker _deliveredMessageOffsetTracker;
private final Map<TopicPartition, Long> _partitionConsumerHighWatermark;
private final Auditor<K, V> _auditor;
private final boolean _skipRecordOnException;

public ConsumerRecordsProcessor(MessageAssembler messageAssembler,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
DeliveredMessageOffsetTracker deliveredMessageOffsetTracker,
Auditor<K, V> auditor) {
Auditor<K, V> auditor,
boolean skipRecordOnException) {
_messageAssembler = messageAssembler;
_keyDeserializer = keyDeserializer;
_valueDeserializer = valueDeserializer;
Expand All @@ -50,6 +52,7 @@ public ConsumerRecordsProcessor(MessageAssembler messageAssembler,
if (_auditor == null) {
LOG.info("Auditing is disabled because no auditor is defined.");
}
_skipRecordOnException = skipRecordOnException;
}

/**
Expand All @@ -58,22 +61,25 @@ public ConsumerRecordsProcessor(MessageAssembler messageAssembler,
* @param consumerRecords The consumer records to be filtered.
* @return filtered consumer records.
*/
public ConsumerRecords<K, V> process(ConsumerRecords<byte[], byte[]> consumerRecords) {
Map<TopicPartition, List<ConsumerRecord<K, V>>> filteredRecords = new HashMap<>();
public ConsumerRecordsProcessResult<K, V> process(ConsumerRecords<byte[], byte[]> consumerRecords) {
ConsumerRecordsProcessResult<K, V> result = new ConsumerRecordsProcessResult<>();
for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
ConsumerRecord<K, V> handledRecord = handleConsumerRecord(record);
// Only put record into map if it is not null
if (handledRecord != null) {
List<ConsumerRecord<K, V>> list = filteredRecords.get(tp);
if (list == null) {
list = new ArrayList<>();
filteredRecords.put(tp, list);
if (result.hasError(tp)) {
continue;
}
long offset = record.offset();
try {
ConsumerRecord<K, V> handledRecord = handleConsumerRecord(record);
result.addRecord(tp, handledRecord);
} catch (RuntimeException e) {
LOG.warn("Exception thrown when processing message with offset {} from partition {}", offset, tp, e);
if (!_skipRecordOnException) {
result.recordException(tp, offset, e);
}
list.add(handledRecord);
}
}
return new ConsumerRecords<>(filteredRecords);
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.linkedin.kafka.clients.largemessage.errors;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;


public class ConsumerRecordsProcessingException extends RuntimeException {
private final List<RecordProcessingException> _recordProcessingExceptions;

public ConsumerRecordsProcessingException(List<RecordProcessingException> exceptions) {
super(String.format("Received exception when processing messages for %d partitions.", exceptions.size()), exceptions.get(0));
Iterator<RecordProcessingException> exceptionIterator = exceptions.iterator();
// skip the first exception.
exceptionIterator.next();
while (exceptionIterator.hasNext()) {
addSuppressed(exceptionIterator.next());
}
_recordProcessingExceptions = exceptions;
}

public List<RecordProcessingException> recordProcessingExceptions() {
return Collections.unmodifiableList(_recordProcessingExceptions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.linkedin.kafka.clients.largemessage.errors;

import org.apache.kafka.common.TopicPartition;


/**
* An exception indicating a consumer record processing encountered error.
*/
public class RecordProcessingException extends RuntimeException {
private final TopicPartition _topicPartition;
private final long _offset;

public RecordProcessingException(TopicPartition tp, long offset, Throwable cause) {
super(cause);
_topicPartition = tp;
_offset = offset;
}

public TopicPartition topicPartition() {
return _topicPartition;
}

public long offset() {
return _offset;
}

@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
Loading

0 comments on commit 388de2f

Please sign in to comment.