Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.perf;

import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.infra.Blackhole;


@State(Scope.Benchmark)
public class BenchmarkFixedByteSVForwardIndexReader {

private static final File INDEX_DIR =
new File(FileUtils.getTempDirectory(), "BenchmarkFixedByteSVForwardIndexReader");

@Param("10000")
int _blockSize;

@Param("1000")
int _numBlocks;

private int[] _docIds;
private double[] _doubleBuffer;
private long[] _longBuffer;
private FixedByteChunkSVForwardIndexReader _compressedReader;
private FixedBytePower2ChunkSVForwardIndexReader _compressedPow2Reader;

@Setup(Level.Trial)
public void setup()
throws IOException {
FileUtils.forceMkdir(INDEX_DIR);
File compressedIndexFile = new File(INDEX_DIR, UUID.randomUUID().toString());
File pow2CompressedIndexFile = new File(INDEX_DIR, UUID.randomUUID().toString());
_doubleBuffer = new double[_blockSize];
_longBuffer = new long[_blockSize];
try (FixedByteChunkSVForwardIndexWriter writer = new FixedByteChunkSVForwardIndexWriter(compressedIndexFile,
ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000, Long.BYTES, 3);
FixedByteChunkSVForwardIndexWriter pow2Writer = new FixedByteChunkSVForwardIndexWriter(pow2CompressedIndexFile,
ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000, Long.BYTES, 4)) {
for (int i = 0; i < _numBlocks * _blockSize; i++) {
long next = ThreadLocalRandom.current().nextLong();
writer.putLong(next);
pow2Writer.putLong(next);
}
}
_compressedReader = new FixedByteChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(compressedIndexFile),
FieldSpec.DataType.LONG);
_compressedPow2Reader =
new FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(pow2CompressedIndexFile),
FieldSpec.DataType.LONG);
_docIds = new int[_blockSize];
}

@TearDown(Level.Trial)
public void teardown()
throws IOException {
FileUtils.deleteDirectory(INDEX_DIR);
}

@Benchmark
public void readCompressedDoublesNonContiguousV3(Blackhole bh)
throws IOException {
readCompressedDoublesNonContiguous(bh, _compressedReader);
}

@Benchmark
public void readCompressedDoublesNonContiguousV4(Blackhole bh)
throws IOException {
readCompressedDoublesNonContiguous(bh, _compressedPow2Reader);
}

@Benchmark
public void readCompressedLongsNonContiguousV3(Blackhole bh)
throws IOException {
readCompressedLongsNonContiguous(bh, _compressedReader);
}

@Benchmark
public void readCompressedLongsNonContiguousV4(Blackhole bh)
throws IOException {
readCompressedLongsNonContiguous(bh, _compressedPow2Reader);
}

private void readCompressedLongsNonContiguous(Blackhole bh, ForwardIndexReader<ChunkReaderContext> reader)
throws IOException {
try (ChunkReaderContext context = reader.createContext()) {
for (int block = 0; block < _numBlocks / 2; block++) {
for (int i = 0; i < _docIds.length; i++) {
_docIds[i] = block * _blockSize + i * 2;
}
for (int i = 0; i < _docIds.length; i++) {
_longBuffer[i] = reader.getLong(_docIds[i], context);
}
bh.consume(_longBuffer);
}
}
}

private void readCompressedDoublesNonContiguous(Blackhole bh, ForwardIndexReader<ChunkReaderContext> reader)
throws IOException {
try (ChunkReaderContext context = reader.createContext()) {
for (int block = 0; block < _numBlocks / 2; block++) {
for (int i = 0; i < _docIds.length; i++) {
_docIds[i] = block * _blockSize + i * 2;
}
for (int i = 0; i < _docIds.length; i++) {
_doubleBuffer[i] = reader.getDouble(_docIds[i], context);
}
bh.consume(_doubleBuffer);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
Expand Down Expand Up @@ -200,7 +200,7 @@ public void readV3(V3State state, Blackhole bh)
try (PinotDataBuffer buffer = PinotDataBuffer.loadBigEndianFile(state._file);
VarByteChunkSVForwardIndexReader reader =
new VarByteChunkSVForwardIndexReader(buffer, FieldSpec.DataType.BYTES);
BaseChunkSVForwardIndexReader.ChunkReaderContext context = reader.createContext()) {
ChunkReaderContext context = reader.createContext()) {
for (int i = 0; i < state._records; i++) {
bh.consume(reader.getBytes(i, context));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ public abstract class BaseChunkSVForwardIndexWriter implements Closeable {
* @param chunkSize Size of chunk
* @param sizeOfEntry Size of entry (in bytes), max size for variable byte implementation.
* @param version version of File
* @param fixed if the data type is fixed width (required for version validation)
* @throws IOException if the file isn't found or can't be mapped
*/
protected BaseChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs,
int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version)
int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version, boolean fixed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this extra fixed here for the version validation. Var-length V4 won't use this writer, but I don't think we should validate that here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be calamitous if this were used for variable length data by mistake, given that V4 for variable length data has a different layout. The only resolution would be to delete the data. So I felt it was important to validate.

throws IOException {
Preconditions.checkArgument(version == DEFAULT_VERSION || version == CURRENT_VERSION);
Preconditions.checkArgument(version == DEFAULT_VERSION || version == CURRENT_VERSION
|| (fixed && version == 4));
_chunkSize = chunkSize;
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType);
_headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
Expand All @@ -87,6 +89,7 @@ public static int getHeaderEntryChunkOffsetSize(int version) {
case 2:
return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2;
case 3:
case 4:
return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3;
default:
throw new IllegalStateException("Invalid version: " + version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ public class FixedByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexW
public FixedByteChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs,
int numDocsPerChunk, int sizeOfEntry, int writerVersion)
throws IOException {
super(file, compressionType, totalDocs, numDocsPerChunk, (sizeOfEntry * numDocsPerChunk), sizeOfEntry,
writerVersion);
super(file, compressionType, totalDocs, normalizeDocsPerChunk(writerVersion, numDocsPerChunk),
(sizeOfEntry * normalizeDocsPerChunk(writerVersion, numDocsPerChunk)), sizeOfEntry,
writerVersion, true);
_chunkDataOffset = 0;
}

Expand Down Expand Up @@ -112,4 +113,12 @@ private void flushChunkIfNeeded() {
writeChunk();
}
}

private static int normalizeDocsPerChunk(int version, int numDocsPerChunk) {
// V4 uses power of 2 chunk sizes for random access efficiency
if (version >= 4 && (numDocsPerChunk & (numDocsPerChunk - 1)) != 0) {
return 1 << (32 - Integer.numberOfLeadingZeros(numDocsPerChunk - 1));
}
return numDocsPerChunk;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType compress
super(file, compressionType, totalDocs, numDocsPerChunk,
numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + lengthOfLongestEntry),
// chunkSize
lengthOfLongestEntry, writerVersion);
lengthOfLongestEntry, writerVersion, false);

_chunkHeaderOffset = 0;
_chunkHeaderSize = numDocsPerChunk * CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
Expand Down Expand Up @@ -89,10 +90,12 @@ public ForwardIndexReader<?> newForwardIndexReader(PinotDataBuffer dataBuffer, C
} else {
FieldSpec.DataType storedType = columnMetadata.getDataType().getStoredType();
if (columnMetadata.isSingleValue()) {
int version = dataBuffer.getInt(0);
if (storedType.isFixedWidth()) {
return new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
return version >= FixedBytePower2ChunkSVForwardIndexReader.VERSION
? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer, storedType)
: new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
}
int version = dataBuffer.getInt(0);
if (version >= VarByteChunkSVForwardIndexWriterV4.VERSION) {
return new VarByteChunkSVForwardIndexReaderV4(dataBuffer, storedType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.memory.CleanerUtil;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.slf4j.Logger;
Expand All @@ -37,8 +35,7 @@
/**
* Base implementation for chunk-based single-value raw (non-dictionary-encoded) forward index reader.
*/
public abstract class BaseChunkSVForwardIndexReader
implements ForwardIndexReader<BaseChunkSVForwardIndexReader.ChunkReaderContext> {
public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReader<ChunkReaderContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseChunkSVForwardIndexReader.class);

protected final PinotDataBuffer _dataBuffer;
Expand Down Expand Up @@ -114,7 +111,10 @@ protected ByteBuffer getChunkBuffer(int docId, ChunkReaderContext context) {
if (context.getChunkId() == chunkId) {
return context.getChunkBuffer();
}
return decompressChunk(chunkId, context);
}

protected ByteBuffer decompressChunk(int chunkId, ChunkReaderContext context) {
int chunkSize;
long chunkPosition = getChunkPosition(chunkId);

Expand Down Expand Up @@ -172,45 +172,4 @@ public void close() {
// NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The
// caller is responsible of closing the PinotDataBuffer.
}

/**
* Context for the chunk-based forward index readers.
* <p>Information saved in the context can be used by subsequent reads as cache:
* <ul>
* <li>
* Chunk Buffer from the previous read. Useful if the subsequent read is from the same buffer, as it avoids extra
* chunk decompression.
* </li>
* <li>Id for the chunk</li>
* </ul>
*/
public static class ChunkReaderContext implements ForwardIndexReaderContext {
private final ByteBuffer _chunkBuffer;
private int _chunkId;

public ChunkReaderContext(int maxChunkSize) {
_chunkBuffer = ByteBuffer.allocateDirect(maxChunkSize);
_chunkId = -1;
}

public ByteBuffer getChunkBuffer() {
return _chunkBuffer;
}

public int getChunkId() {
return _chunkId;
}

public void setChunkId(int chunkId) {
_chunkId = chunkId;
}

@Override
public void close()
throws IOException {
if (CleanerUtil.UNMAP_SUPPORTED) {
CleanerUtil.getCleaner().freeBuffer(_chunkBuffer);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.segment.index.readers.forward;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.memory.CleanerUtil;


/**
* Context for the chunk-based forward index readers.
* <p>Information saved in the context can be used by subsequent reads as cache:
* <ul>
* <li>
* Chunk Buffer from the previous read. Useful if the subsequent read is from the same buffer, as it avoids extra
* chunk decompression.
* </li>
* <li>Id for the chunk</li>
* </ul>
*/
public class ChunkReaderContext implements ForwardIndexReaderContext {
private final ByteBuffer _chunkBuffer;
private int _chunkId;

public ChunkReaderContext(int maxChunkSize) {
_chunkBuffer = ByteBuffer.allocateDirect(maxChunkSize);
_chunkId = -1;
}

public ByteBuffer getChunkBuffer() {
return _chunkBuffer;
}

public int getChunkId() {
return _chunkId;
}

public void setChunkId(int chunkId) {
_chunkId = chunkId;
}

@Override
public void close()
throws IOException {
if (CleanerUtil.UNMAP_SUPPORTED) {
CleanerUtil.getCleaner().freeBuffer(_chunkBuffer);
}
}
}
Loading