-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-3163: Add time based index to Kafka. #1215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,287 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package kafka.log | ||
|
||
import java.io.{File, RandomAccessFile} | ||
import java.nio.{ByteBuffer, MappedByteBuffer} | ||
import java.nio.channels.FileChannel | ||
import java.util.concurrent.locks.{Lock, ReentrantLock} | ||
|
||
import kafka.log.IndexSearchType.IndexSearchEntity | ||
import kafka.utils.CoreUtils.inLock | ||
import kafka.utils.{CoreUtils, Logging, Os} | ||
import org.apache.kafka.common.utils.Utils | ||
import sun.nio.ch.DirectBuffer | ||
|
||
import scala.math.ceil | ||
|
||
/** | ||
* The abstract index class which holds entry format agnostic methods. | ||
* | ||
* @param _file The index file | ||
* @param baseOffset the base offset of the segment that this index is corresponding to. | ||
* @param maxIndexSize The maximum index size in bytes. | ||
*/ | ||
abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) | ||
extends Logging { | ||
|
||
protected def entrySize: Int | ||
|
||
protected val lock = new ReentrantLock | ||
|
||
@volatile | ||
protected var mmap: MappedByteBuffer = { | ||
val newlyCreated = _file.createNewFile() | ||
val raf = new RandomAccessFile(_file, "rw") | ||
try { | ||
/* pre-allocate the file if necessary */ | ||
if(newlyCreated) { | ||
if(maxIndexSize < entrySize) | ||
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) | ||
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize)) | ||
} | ||
|
||
/* memory-map the file */ | ||
val len = raf.length() | ||
val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) | ||
|
||
/* set the position in the index for the next entry */ | ||
if(newlyCreated) | ||
idx.position(0) | ||
else | ||
// if this is a pre-existing index, assume it is valid and set position to last entry | ||
idx.position(roundDownToExactMultiple(idx.limit, entrySize)) | ||
idx | ||
} finally { | ||
CoreUtils.swallow(raf.close()) | ||
} | ||
} | ||
|
||
/** | ||
* The maximum number of entries this index can hold | ||
*/ | ||
@volatile | ||
private[this] var _maxEntries = mmap.limit / entrySize | ||
|
||
/** The number of entries in this index */ | ||
@volatile | ||
protected var _entries = mmap.position / entrySize | ||
|
||
/** | ||
* True iff there are no more slots available in this index | ||
*/ | ||
def isFull: Boolean = _entries >= _maxEntries | ||
|
||
def maxEntries: Int = _maxEntries | ||
|
||
def entries: Int = _entries | ||
|
||
/** | ||
* The index file | ||
*/ | ||
def file: File = _file | ||
|
||
/** | ||
* Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in | ||
* trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at | ||
* loading segments from disk or truncating back to an old segment where a new log segment became active; | ||
* we want to reset the index size to maximum index size to avoid rolling new segment. | ||
*/ | ||
def resize(newSize: Int) { | ||
inLock(lock) { | ||
val raf = new RandomAccessFile(_file, "rw") | ||
val roundedNewSize = roundDownToExactMultiple(newSize, entrySize) | ||
val position = mmap.position | ||
|
||
/* Windows won't let us modify the file length while the file is mmapped :-( */ | ||
if(Os.isWindows) | ||
forceUnmap(mmap) | ||
try { | ||
raf.setLength(roundedNewSize) | ||
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) | ||
_maxEntries = mmap.limit / entrySize | ||
mmap.position(position) | ||
} finally { | ||
CoreUtils.swallow(raf.close()) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Rename the file that backs this offset index | ||
* | ||
* @throws IOException if rename fails | ||
*/ | ||
def renameTo(f: File) { | ||
try Utils.atomicMoveWithFallback(_file.toPath, f.toPath) | ||
finally _file = f | ||
} | ||
|
||
/** | ||
* Flush the data in the index to disk | ||
*/ | ||
def flush() { | ||
inLock(lock) { | ||
mmap.force() | ||
} | ||
} | ||
|
||
/** | ||
* Delete this index file | ||
*/ | ||
def delete(): Boolean = { | ||
info(s"Deleting index ${_file.getAbsolutePath}") | ||
if(Os.isWindows) | ||
CoreUtils.swallow(forceUnmap(mmap)) | ||
_file.delete() | ||
} | ||
|
||
/** | ||
* Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from | ||
* the file. | ||
*/ | ||
def trimToValidSize() { | ||
inLock(lock) { | ||
resize(entrySize * _entries) | ||
} | ||
} | ||
|
||
/** | ||
* The number of bytes actually used by this index | ||
*/ | ||
def sizeInBytes = entrySize * _entries | ||
|
||
/** Close the index */ | ||
def close() { | ||
trimToValidSize() | ||
} | ||
|
||
/** | ||
* Do a basic sanity check on this index to detect obvious problems | ||
* | ||
* @throws IllegalArgumentException if any problems are found | ||
*/ | ||
def sanityCheck(): Unit | ||
|
||
/** | ||
* Remove all the entries from the index. | ||
*/ | ||
def truncate(): Unit | ||
|
||
/** | ||
* Remove all entries from the index which have an offset greater than or equal to the given offset. | ||
* Truncating to an offset larger than the largest in the index has no effect. | ||
*/ | ||
def truncateTo(offset: Long): Unit | ||
|
||
/** | ||
* Forcefully free the buffer's mmap. We do this only on windows. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May be remove forcefully? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a pre-existing comments. I am not sure if it is really a "forceful" free. So I just leave it as is. |
||
*/ | ||
protected def forceUnmap(m: MappedByteBuffer) { | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again not a big deal, but we can use pattern matching to be more Scala-like.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is true that pattern matching is a more Scala-like. The suggested code is missing a case entry if the first case doesn't match though. |
||
m match { | ||
case buffer: DirectBuffer => | ||
val bufferCleaner = buffer.cleaner() | ||
/* cleaner can be null if the mapped region has size 0 */ | ||
if (bufferCleaner != null) | ||
bufferCleaner.clean() | ||
case _ => | ||
} | ||
} catch { | ||
case t: Throwable => error("Error when freeing index buffer", t) | ||
} | ||
} | ||
|
||
/** | ||
* Execute the given function in a lock only if we are running on windows. We do this | ||
* because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it | ||
* and this requires synchronizing reads. | ||
*/ | ||
protected def maybeLock[T](lock: Lock)(fun: => T): T = { | ||
if(Os.isWindows) | ||
lock.lock() | ||
try { | ||
fun | ||
} finally { | ||
if(Os.isWindows) | ||
lock.unlock() | ||
} | ||
} | ||
|
||
/** | ||
* To parse an entry in the index. | ||
* | ||
* @param buffer the buffer of this memory mapped index. | ||
* @param n the slot | ||
* @return the index entry stored in the given slot. | ||
*/ | ||
protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry | ||
|
||
/** | ||
* Find the slot in which the largest entry less than or equal to the given target key or value is stored. | ||
* The comparison is made using the `IndexEntry.compareTo()` method. | ||
* | ||
* @param idx The index buffer | ||
* @param target The index key to look for | ||
* @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty | ||
*/ | ||
protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = { | ||
// check if the index is empty | ||
if(_entries == 0) | ||
return -1 | ||
|
||
// check if the target offset is smaller than the least offset | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When searching for offset, the passed in value is the absolute offset whereas the offsets in the index are relative. So, we use to translate the absolute offset to the relative one in indexSlotFor(). That logic seems to be lost in the new code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic has been moved to |
||
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) | ||
return -1 | ||
|
||
// binary search for the entry | ||
var lo = 0 | ||
var hi = _entries - 1 | ||
while(lo < hi) { | ||
val mid = ceil(hi/2.0 + lo/2.0).toInt | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit odd, why not the standard |
||
val found = parseEntry(idx, mid) | ||
val compareResult = compareIndexEntry(found, target, searchEntity) | ||
if(compareResult > 0) | ||
hi = mid - 1 | ||
else if(compareResult < 0) | ||
lo = mid | ||
else | ||
return mid | ||
} | ||
lo | ||
} | ||
|
||
private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity): Int = { | ||
searchEntity match { | ||
case IndexSearchType.KEY => indexEntry.indexKey.compareTo(target) | ||
case IndexSearchType.VALUE => indexEntry.indexValue.compareTo(target) | ||
} | ||
} | ||
|
||
/** | ||
* Round a number to the greatest exact multiple of the given factor less than the given number. | ||
* E.g. roundDownToExactMultiple(67, 8) == 64 | ||
*/ | ||
private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor) | ||
|
||
} | ||
|
||
object IndexSearchType extends Enumeration { | ||
type IndexSearchEntity = Value | ||
val KEY, VALUE = Value | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -117,7 +117,13 @@ class FileMessageSet private[kafka](@volatile var file: File, | |
new FileMessageSet(file, | ||
channel, | ||
start = this.start + position, | ||
end = math.min(this.start + position + size, sizeInBytes())) | ||
end = { | ||
// Handle the integer overflow | ||
if (this.start + position + size < 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, this may not be reliable. Could we just compare this to (Long) Int.Max? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The assumption here is that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, could this.start + position overflow to a negative value and then + size bring it to positive again? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Theoretically it is possible. But if it is a log segment it seems not possible because the |
||
sizeInBytes() | ||
else | ||
math.min(this.start + position + size, sizeInBytes()) | ||
}) | ||
} | ||
|
||
/** | ||
|
@@ -126,7 +132,7 @@ class FileMessageSet private[kafka](@volatile var file: File, | |
* @param targetOffset The offset to search for. | ||
* @param startingPosition The starting position in the file to begin searching from. | ||
*/ | ||
def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = { | ||
def searchForOffset(targetOffset: Long, startingPosition: Int): OffsetPosition = { | ||
var position = startingPosition | ||
val buffer = ByteBuffer.allocate(MessageSet.LogOverhead) | ||
val size = sizeInBytes() | ||
|
@@ -135,7 +141,7 @@ class FileMessageSet private[kafka](@volatile var file: File, | |
channel.read(buffer, position) | ||
if(buffer.hasRemaining) | ||
throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s" | ||
.format(targetOffset, startingPosition, file.getAbsolutePath)) | ||
.format(targetOffset, startingPosition, file.getAbsolutePath)) | ||
buffer.rewind() | ||
val offset = buffer.getLong() | ||
if(offset >= targetOffset) | ||
|
@@ -148,6 +154,72 @@ class FileMessageSet private[kafka](@volatile var file: File, | |
null | ||
} | ||
|
||
/** | ||
* Search forward for the message whose timestamp is greater than or equals to the target timestamp. | ||
* | ||
* The search will stop immediately when it sees a message in format version before 0.10.0. This is to avoid | ||
* scanning the entire log when all the messages are still in old format. | ||
* | ||
* @param targetTimestamp The timestamp to search for. | ||
* @param startingPosition The starting position to search. | ||
* @return None, if no message exists at or after the starting position. | ||
* Some(the_next_offset_to_read) otherwise. | ||
*/ | ||
def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[Long] = { | ||
var maxTimestampChecked = Message.NoTimestamp | ||
var lastOffsetChecked = -1L | ||
val messagesToSearch = read(startingPosition, sizeInBytes) | ||
for (messageAndOffset <- messagesToSearch) { | ||
val message = messageAndOffset.message | ||
lastOffsetChecked = messageAndOffset.offset | ||
// Stop searching once we see message format before 0.10.0. | ||
// This equivalent as treating message without timestamp has the largest timestamp. | ||
// We do this to avoid scanning the entire log if no message has a timestamp. | ||
if (message.magic == Message.MagicValue_V0) | ||
return Some(messageAndOffset.offset) | ||
else if (message.timestamp >= targetTimestamp) { | ||
// We found a message | ||
message.compressionCodec match { | ||
case NoCompressionCodec => | ||
return Some(messageAndOffset.offset) | ||
case _ => | ||
// Iterate over the inner messages to get the exact offset. | ||
for (innerMessage <- ByteBufferMessageSet.deepIterator(messageAndOffset)) { | ||
val timestamp = innerMessage.message.timestamp | ||
if (timestamp >= targetTimestamp) | ||
return Some(innerMessage.offset) | ||
} | ||
throw new IllegalStateException(s"The message set (max timestamp = ${message.timestamp}, max offset = ${messageAndOffset.offset}" + | ||
s" should contain target timestamp $targetTimestamp but it does not.") | ||
} | ||
} else | ||
maxTimestampChecked = math.max(maxTimestampChecked, message.timestamp) | ||
} | ||
|
||
if (lastOffsetChecked >= 0) | ||
Some(lastOffsetChecked + 1) | ||
else | ||
None | ||
} | ||
|
||
/** | ||
* Return the largest timestamp of the messages after a given position in this file message set. | ||
* @param startingPosition The starting position. | ||
* @return The largest timestamp of the messages after the given position. | ||
*/ | ||
def largestTimestampAfter(startingPosition: Int): TimestampOffset = { | ||
var maxTimestamp = Message.NoTimestamp | ||
var offsetOfMaxTimestamp = -1L | ||
val messagesToSearch = read(startingPosition, Int.MaxValue) | ||
for (messageAndOffset <- messagesToSearch) { | ||
if (messageAndOffset.message.timestamp > maxTimestamp) { | ||
maxTimestamp = messageAndOffset.message.timestamp | ||
offsetOfMaxTimestamp = messageAndOffset.offset | ||
} | ||
} | ||
TimestampOffset(maxTimestamp, offsetOfMaxTimestamp) | ||
} | ||
|
||
/** | ||
* Write some of this set to the given channel. | ||
* @param destChannel The channel to write to. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using a different error message? e.g. entry size exceeds max index size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this message is warning about a wrong maxIndexSize argument, but not because there are too many entries. So the error message seems right. But we can probably add the current index size in the log.