diff --git a/src/main/java/com/linkedin/kafka/clients/largemessage/LargeMessageBufferPool.java b/src/main/java/com/linkedin/kafka/clients/largemessage/LargeMessageBufferPool.java index bb1ec53..845f505 100644 --- a/src/main/java/com/linkedin/kafka/clients/largemessage/LargeMessageBufferPool.java +++ b/src/main/java/com/linkedin/kafka/clients/largemessage/LargeMessageBufferPool.java @@ -7,6 +7,8 @@ import com.linkedin.kafka.clients.largemessage.errors.InvalidSegmentException; import com.linkedin.kafka.clients.largemessage.errors.LargeMessageDroppedException; import com.linkedin.kafka.clients.utils.QueuedMap; +import java.util.ArrayList; +import java.util.List; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,6 +102,7 @@ public synchronized long safeOffset(TopicPartition tp) { public synchronized void clear() { _incompleteMessageMap.clear(); + _incompleteMessageByPartition.clear(); _offsetTracker.clear(); _bufferUsed = 0L; } @@ -128,6 +131,7 @@ private void maybeEvictMessagesForSpace(long freeSpaceNeeded) { throw new InvalidSegmentException("Saw single message segment size = " + freeSpaceNeeded + ", which is " + "larger than buffer capacity = " + _bufferCapacity); } + sanityCheck(); // When the eldest message is the current message, the message will not be completed. This indicates the buffer // capacity is too small to hold even one message. while (bufferUsed() + freeSpaceNeeded > _bufferCapacity) { @@ -141,6 +145,9 @@ private void maybeEvictMessagesForSpace(long freeSpaceNeeded) { } else { LOG.warn("Incomplete message buffer pool is full. Removing the eldest incomplete message." + message); } + } else { + throw new IllegalStateException("The buffer used is " + _bufferUsed + " even if there is no incomplete " + + "large message."); } } } @@ -197,4 +204,27 @@ private LargeMessage validateSegmentAndGetMessage(TopicPartition tp, LargeMessag } return message; } + + + // Adding the sanity check to see if the buffered messages match the buffer used. + private void sanityCheck() { + int bufferedBytes = 0; + for (Set uuids : _incompleteMessageByPartition.values()) { + for (UUID id : uuids) { + bufferedBytes += _incompleteMessageMap.get(id).bufferedSizeInBytes(); + } + } + if (bufferedBytes != _bufferUsed) { + List largeMessages = new ArrayList<>(_incompleteMessageMap.size()); + for (Set uuids : _incompleteMessageByPartition.values()) { + for (UUID id : uuids) { + largeMessages.add(_incompleteMessageMap.get(id)); + } + } + String errorMessage = "Total number of bytes used " + bufferedBytes + ", reported bytes used " + _bufferUsed; + LOG.error(errorMessage); + LOG.error("All buffered messages {}", largeMessages); + throw new IllegalStateException("Total number of bytes used " + bufferedBytes + ", reported bytes used " + _bufferUsed); + } + } } diff --git a/src/main/java/com/linkedin/kafka/clients/utils/QueuedMap.java b/src/main/java/com/linkedin/kafka/clients/utils/QueuedMap.java index e50b970..5170a32 100644 --- a/src/main/java/com/linkedin/kafka/clients/utils/QueuedMap.java +++ b/src/main/java/com/linkedin/kafka/clients/utils/QueuedMap.java @@ -132,6 +132,7 @@ public synchronized void add(ListNode node) { node.next = null; _tail = node; } + _size++; } public synchronized ListNode addKey(K key) { diff --git a/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerIntegrationTest.java b/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerIntegrationTest.java index 3081380..06c0731 100644 --- a/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerIntegrationTest.java +++ b/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerIntegrationTest.java @@ -783,7 +783,6 @@ public void close() { ConsumerRecords records = ConsumerRecords.empty(); while (records.isEmpty()) { records = consumer.poll(1000); - System.out.println(records.count() + ", " + records.isEmpty()); } assertEquals(records.count(), 1, "Only the first message should be returned"); assertEquals(records.iterator().next().offset(), 2L, "The offset of the first message should be 2."); @@ -1033,8 +1032,8 @@ private void produceSyntheticMessages(String topic) { // Add two more segment to partition SYNTHETIC_PARTITION_1 for corner case test. - List> m0SegsPartition1 = splitter.split(topic, SYNTHETIC_PARTITION_1, messageId0, message0.getBytes()); - List> m1SegsPartition1 = splitter.split(topic, SYNTHETIC_PARTITION_1, messageId1, message1.getBytes()); + List> m0SegsPartition1 = splitter.split(topic, SYNTHETIC_PARTITION_1, LiKafkaClientsUtils.randomUUID(), message0.getBytes()); + List> m1SegsPartition1 = splitter.split(topic, SYNTHETIC_PARTITION_1, LiKafkaClientsUtils.randomUUID(), message1.getBytes()); try { producer.send(m0Segs.get(0)).get();