Skip to content

Commit

Permalink
Add sanity check logic to the message eviction. (linkedin#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
becketqin authored Jun 21, 2017
1 parent c0db53e commit 6f61c0c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.linkedin.kafka.clients.largemessage.errors.InvalidSegmentException;
import com.linkedin.kafka.clients.largemessage.errors.LargeMessageDroppedException;
import com.linkedin.kafka.clients.utils.QueuedMap;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -100,6 +102,7 @@ public synchronized long safeOffset(TopicPartition tp) {

public synchronized void clear() {
_incompleteMessageMap.clear();
_incompleteMessageByPartition.clear();
_offsetTracker.clear();
_bufferUsed = 0L;
}
Expand Down Expand Up @@ -128,6 +131,7 @@ private void maybeEvictMessagesForSpace(long freeSpaceNeeded) {
throw new InvalidSegmentException("Saw single message segment size = " + freeSpaceNeeded + ", which is "
+ "larger than buffer capacity = " + _bufferCapacity);
}
sanityCheck();
// When the eldest message is the current message, the message will not be completed. This indicates the buffer
// capacity is too small to hold even one message.
while (bufferUsed() + freeSpaceNeeded > _bufferCapacity) {
Expand All @@ -141,6 +145,9 @@ private void maybeEvictMessagesForSpace(long freeSpaceNeeded) {
} else {
LOG.warn("Incomplete message buffer pool is full. Removing the eldest incomplete message." + message);
}
} else {
throw new IllegalStateException("The buffer used is " + _bufferUsed + " even if there is no incomplete "
+ "large message.");
}
}
}
Expand Down Expand Up @@ -197,4 +204,27 @@ private LargeMessage validateSegmentAndGetMessage(TopicPartition tp, LargeMessag
}
return message;
}


// Adding the sanity check to see if the buffered messages match the buffer used.
private void sanityCheck() {
int bufferedBytes = 0;
for (Set<UUID> uuids : _incompleteMessageByPartition.values()) {
for (UUID id : uuids) {
bufferedBytes += _incompleteMessageMap.get(id).bufferedSizeInBytes();
}
}
if (bufferedBytes != _bufferUsed) {
List<LargeMessage> largeMessages = new ArrayList<>(_incompleteMessageMap.size());
for (Set<UUID> uuids : _incompleteMessageByPartition.values()) {
for (UUID id : uuids) {
largeMessages.add(_incompleteMessageMap.get(id));
}
}
String errorMessage = "Total number of bytes used " + bufferedBytes + ", reported bytes used " + _bufferUsed;
LOG.error(errorMessage);
LOG.error("All buffered messages {}", largeMessages);
throw new IllegalStateException("Total number of bytes used " + bufferedBytes + ", reported bytes used " + _bufferUsed);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public synchronized void add(ListNode node) {
node.next = null;
_tail = node;
}
_size++;
}

public synchronized ListNode addKey(K key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,6 @@ public void close() {
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
while (records.isEmpty()) {
records = consumer.poll(1000);
System.out.println(records.count() + ", " + records.isEmpty());
}
assertEquals(records.count(), 1, "Only the first message should be returned");
assertEquals(records.iterator().next().offset(), 2L, "The offset of the first message should be 2.");
Expand Down Expand Up @@ -1033,8 +1032,8 @@ private void produceSyntheticMessages(String topic) {


// Add two more segment to partition SYNTHETIC_PARTITION_1 for corner case test.
List<ProducerRecord<byte[], byte[]>> m0SegsPartition1 = splitter.split(topic, SYNTHETIC_PARTITION_1, messageId0, message0.getBytes());
List<ProducerRecord<byte[], byte[]>> m1SegsPartition1 = splitter.split(topic, SYNTHETIC_PARTITION_1, messageId1, message1.getBytes());
List<ProducerRecord<byte[], byte[]>> m0SegsPartition1 = splitter.split(topic, SYNTHETIC_PARTITION_1, LiKafkaClientsUtils.randomUUID(), message0.getBytes());
List<ProducerRecord<byte[], byte[]>> m1SegsPartition1 = splitter.split(topic, SYNTHETIC_PARTITION_1, LiKafkaClientsUtils.randomUUID(), message1.getBytes());

try {
producer.send(m0Segs.get(0)).get();
Expand Down

0 comments on commit 6f61c0c

Please sign in to comment.