Skip to content

KAFKA-3163: Add time based index to Kafka. #1215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 287 additions & 0 deletions core/src/main/scala/kafka/log/AbstractIndex.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log

import java.io.{File, RandomAccessFile}
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.nio.channels.FileChannel
import java.util.concurrent.locks.{Lock, ReentrantLock}

import kafka.log.IndexSearchType.IndexSearchEntity
import kafka.utils.CoreUtils.inLock
import kafka.utils.{CoreUtils, Logging, Os}
import org.apache.kafka.common.utils.Utils
import sun.nio.ch.DirectBuffer

import scala.math.ceil

/**
* The abstract index class which holds entry format agnostic methods.
*
* @param _file The index file
* @param baseOffset the base offset of the segment that this index is corresponding to.
* @param maxIndexSize The maximum index size in bytes.
*/
abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1)
extends Logging {

protected def entrySize: Int

protected val lock = new ReentrantLock

@volatile
protected var mmap: MappedByteBuffer = {
val newlyCreated = _file.createNewFile()
val raf = new RandomAccessFile(_file, "rw")
try {
/* pre-allocate the file if necessary */
if(newlyCreated) {
if(maxIndexSize < entrySize)
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using a different error message? e.g. entry size exceeds max index size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this message is warning about a wrong maxIndexSize argument, but not because there are too many entries. So the error message seems right. But we can probably add the current index size in the log.

raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
}

/* memory-map the file */
val len = raf.length()
val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)

/* set the position in the index for the next entry */
if(newlyCreated)
idx.position(0)
else
// if this is a pre-existing index, assume it is valid and set position to last entry
idx.position(roundDownToExactMultiple(idx.limit, entrySize))
idx
} finally {
CoreUtils.swallow(raf.close())
}
}

/**
* The maximum number of entries this index can hold
*/
@volatile
private[this] var _maxEntries = mmap.limit / entrySize

/** The number of entries in this index */
@volatile
protected var _entries = mmap.position / entrySize

/**
* True iff there are no more slots available in this index
*/
def isFull: Boolean = _entries >= _maxEntries

def maxEntries: Int = _maxEntries

def entries: Int = _entries

/**
* The index file
*/
def file: File = _file

/**
* Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
* trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
* loading segments from disk or truncating back to an old segment where a new log segment became active;
* we want to reset the index size to maximum index size to avoid rolling new segment.
*/
def resize(newSize: Int) {
inLock(lock) {
val raf = new RandomAccessFile(_file, "rw")
val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
val position = mmap.position

/* Windows won't let us modify the file length while the file is mmapped :-( */
if(Os.isWindows)
forceUnmap(mmap)
try {
raf.setLength(roundedNewSize)
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
_maxEntries = mmap.limit / entrySize
mmap.position(position)
} finally {
CoreUtils.swallow(raf.close())
}
}
}

/**
* Rename the file that backs this offset index
*
* @throws IOException if rename fails
*/
def renameTo(f: File) {
try Utils.atomicMoveWithFallback(_file.toPath, f.toPath)
finally _file = f
}

/**
* Flush the data in the index to disk
*/
def flush() {
inLock(lock) {
mmap.force()
}
}

/**
* Delete this index file
*/
def delete(): Boolean = {
info(s"Deleting index ${_file.getAbsolutePath}")
if(Os.isWindows)
CoreUtils.swallow(forceUnmap(mmap))
_file.delete()
}

/**
* Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
* the file.
*/
def trimToValidSize() {
inLock(lock) {
resize(entrySize * _entries)
}
}

/**
* The number of bytes actually used by this index
*/
def sizeInBytes = entrySize * _entries

/** Close the index */
def close() {
trimToValidSize()
}

/**
* Do a basic sanity check on this index to detect obvious problems
*
* @throws IllegalArgumentException if any problems are found
*/
def sanityCheck(): Unit

/**
* Remove all the entries from the index.
*/
def truncate(): Unit

/**
* Remove all entries from the index which have an offset greater than or equal to the given offset.
* Truncating to an offset larger than the largest in the index has no effect.
*/
def truncateTo(offset: Long): Unit

/**
* Forcefully free the buffer's mmap. We do this only on windows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be remove forcefully?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pre-existing comments. I am not sure if it is really a "forceful" free. So I just leave it as is.

*/
protected def forceUnmap(m: MappedByteBuffer) {
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again not a big deal, but we can use pattern matching to be more Scala-like.

try {
  m match {
    case dm: sun.nio.ch.DirectBuffer => dm.cleaner().clean()
  }
} catch {
  case t: Throwable => error("Error when freeing index buffer", t)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is true that pattern matching is a more Scala-like. The suggested code is missing a case entry if the first case doesn't match though.

m match {
case buffer: DirectBuffer =>
val bufferCleaner = buffer.cleaner()
/* cleaner can be null if the mapped region has size 0 */
if (bufferCleaner != null)
bufferCleaner.clean()
case _ =>
}
} catch {
case t: Throwable => error("Error when freeing index buffer", t)
}
}

/**
* Execute the given function in a lock only if we are running on windows. We do this
* because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it
* and this requires synchronizing reads.
*/
protected def maybeLock[T](lock: Lock)(fun: => T): T = {
if(Os.isWindows)
lock.lock()
try {
fun
} finally {
if(Os.isWindows)
lock.unlock()
}
}

/**
* To parse an entry in the index.
*
* @param buffer the buffer of this memory mapped index.
* @param n the slot
* @return the index entry stored in the given slot.
*/
protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry

/**
* Find the slot in which the largest entry less than or equal to the given target key or value is stored.
* The comparison is made using the `IndexEntry.compareTo()` method.
*
* @param idx The index buffer
* @param target The index key to look for
* @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty
*/
protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = {
// check if the index is empty
if(_entries == 0)
return -1

// check if the target offset is smaller than the least offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When searching for offset, the passed in value is the absolute offset whereas the offsets in the index are relative. So, we use to translate the absolute offset to the relative one in indexSlotFor(). That logic seems to be lost in the new code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic has been moved to parseIndexEntry() in OffsetIndex and TimeIndex. This is to make indexSlotFor() entry format agnostic.

if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
return -1

// binary search for the entry
var lo = 0
var hi = _entries - 1
while(lo < hi) {
val mid = ceil(hi/2.0 + lo/2.0).toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit odd, why not the standard int mid = (low + high) >>> 1?

val found = parseEntry(idx, mid)
val compareResult = compareIndexEntry(found, target, searchEntity)
if(compareResult > 0)
hi = mid - 1
else if(compareResult < 0)
lo = mid
else
return mid
}
lo
}

private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity): Int = {
searchEntity match {
case IndexSearchType.KEY => indexEntry.indexKey.compareTo(target)
case IndexSearchType.VALUE => indexEntry.indexValue.compareTo(target)
}
}

/**
* Round a number to the greatest exact multiple of the given factor less than the given number.
* E.g. roundDownToExactMultiple(67, 8) == 64
*/
private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor)

}

object IndexSearchType extends Enumeration {
type IndexSearchEntity = Value
val KEY, VALUE = Value
}
78 changes: 75 additions & 3 deletions core/src/main/scala/kafka/log/FileMessageSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ class FileMessageSet private[kafka](@volatile var file: File,
new FileMessageSet(file,
channel,
start = this.start + position,
end = math.min(this.start + position + size, sizeInBytes()))
end = {
// Handle the integer overflow
if (this.start + position + size < 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this may not be reliable. Could we just compare this to (Long) Int.Max?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption here is that this.start + position is a positive integer, and size is also a positive integer. When you say not reliable, do you mean this.start + position can also overflow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, could this.start + position overflow to a negative value and then + size bring it to positive again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically it is possible. But if it is a log segment it seems not possible because the this.start would be 0, so it is essentially position + size. I don't know if we ever call read() on a sliced FileMessageSet.

sizeInBytes()
else
math.min(this.start + position + size, sizeInBytes())
})
}

/**
Expand All @@ -126,7 +132,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin searching from.
*/
def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
def searchForOffset(targetOffset: Long, startingPosition: Int): OffsetPosition = {
var position = startingPosition
val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
val size = sizeInBytes()
Expand All @@ -135,7 +141,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
channel.read(buffer, position)
if(buffer.hasRemaining)
throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
.format(targetOffset, startingPosition, file.getAbsolutePath))
.format(targetOffset, startingPosition, file.getAbsolutePath))
buffer.rewind()
val offset = buffer.getLong()
if(offset >= targetOffset)
Expand All @@ -148,6 +154,72 @@ class FileMessageSet private[kafka](@volatile var file: File,
null
}

/**
* Search forward for the message whose timestamp is greater than or equals to the target timestamp.
*
* The search will stop immediately when it sees a message in format version before 0.10.0. This is to avoid
* scanning the entire log when all the messages are still in old format.
*
* @param targetTimestamp The timestamp to search for.
* @param startingPosition The starting position to search.
* @return None, if no message exists at or after the starting position.
* Some(the_next_offset_to_read) otherwise.
*/
def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[Long] = {
var maxTimestampChecked = Message.NoTimestamp
var lastOffsetChecked = -1L
val messagesToSearch = read(startingPosition, sizeInBytes)
for (messageAndOffset <- messagesToSearch) {
val message = messageAndOffset.message
lastOffsetChecked = messageAndOffset.offset
// Stop searching once we see message format before 0.10.0.
// This equivalent as treating message without timestamp has the largest timestamp.
// We do this to avoid scanning the entire log if no message has a timestamp.
if (message.magic == Message.MagicValue_V0)
return Some(messageAndOffset.offset)
else if (message.timestamp >= targetTimestamp) {
// We found a message
message.compressionCodec match {
case NoCompressionCodec =>
return Some(messageAndOffset.offset)
case _ =>
// Iterate over the inner messages to get the exact offset.
for (innerMessage <- ByteBufferMessageSet.deepIterator(messageAndOffset)) {
val timestamp = innerMessage.message.timestamp
if (timestamp >= targetTimestamp)
return Some(innerMessage.offset)
}
throw new IllegalStateException(s"The message set (max timestamp = ${message.timestamp}, max offset = ${messageAndOffset.offset}" +
s" should contain target timestamp $targetTimestamp but it does not.")
}
} else
maxTimestampChecked = math.max(maxTimestampChecked, message.timestamp)
}

if (lastOffsetChecked >= 0)
Some(lastOffsetChecked + 1)
else
None
}

/**
* Return the largest timestamp of the messages after a given position in this file message set.
* @param startingPosition The starting position.
* @return The largest timestamp of the messages after the given position.
*/
def largestTimestampAfter(startingPosition: Int): TimestampOffset = {
var maxTimestamp = Message.NoTimestamp
var offsetOfMaxTimestamp = -1L
val messagesToSearch = read(startingPosition, Int.MaxValue)
for (messageAndOffset <- messagesToSearch) {
if (messageAndOffset.message.timestamp > maxTimestamp) {
maxTimestamp = messageAndOffset.message.timestamp
offsetOfMaxTimestamp = messageAndOffset.offset
}
}
TimestampOffset(maxTimestamp, offsetOfMaxTimestamp)
}

/**
* Write some of this set to the given channel.
* @param destChannel The channel to write to.
Expand Down
Loading