Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,10 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
import org.apache.pinot.segment.spi.memory.CleanerUtil;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,10 +44,10 @@ public abstract class BaseChunkSVForwardIndexWriter implements Closeable {
private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2 = Integer.BYTES;
private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3 = Long.BYTES;

private final File _file;
private final FileChannel _dataChannel;
private final ByteBuffer _header;
protected final FileChannel _dataFile;
protected ByteBuffer _header;
protected final ByteBuffer _chunkBuffer;
protected final ByteBuffer _compressedBuffer;
protected final ChunkCompressor _chunkCompressor;

protected int _chunkSize;
Expand All @@ -74,15 +71,14 @@ protected BaseChunkSVForwardIndexWriter(File file, ChunkCompressionType compress
int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version)
throws IOException {
Preconditions.checkArgument(version == DEFAULT_VERSION || version == CURRENT_VERSION);
_file = file;
_headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
_dataOffset = headerSize(totalDocs, numDocsPerChunk, _headerEntryChunkOffsetSize);
_chunkSize = chunkSize;
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType);
_headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
_dataOffset = writeHeader(compressionType, totalDocs, numDocsPerChunk, sizeOfEntry, version);
_chunkBuffer = ByteBuffer.allocateDirect(chunkSize);
_dataChannel = new RandomAccessFile(file, "rw").getChannel();
_header = _dataChannel.map(FileChannel.MapMode.READ_WRITE, 0, _dataOffset);
writeHeader(compressionType, totalDocs, numDocsPerChunk, sizeOfEntry, version);
int maxCompressedChunkSize = _chunkCompressor.maxCompressedSize(_chunkSize); // may exceed original chunk size
_compressedBuffer = ByteBuffer.allocateDirect(maxCompressedChunkSize);
_dataFile = new RandomAccessFile(file, "rw").getChannel();
}

public static int getHeaderEntryChunkOffsetSize(int version) {
Expand All @@ -106,13 +102,10 @@ public void close()
writeChunk();
}

if (CleanerUtil.UNMAP_SUPPORTED) {
CleanerUtil.getCleaner().freeBuffer(_header);
}

// we will have overmapped by (maxCompressedSize - actualCompressedSize) for the most recent chunk
_dataChannel.truncate(_dataOffset);
_dataChannel.close();
// Write the header and close the file.
_header.flip();
_dataFile.write(_header, 0);
_dataFile.close();
}

/**
Expand All @@ -123,10 +116,14 @@ public void close()
* @param numDocsPerChunk Number of documents per chunk
* @param sizeOfEntry Size of each entry
* @param version Version of file
* @return Size of header
*/
private void writeHeader(ChunkCompressionType compressionType, int totalDocs, int numDocsPerChunk, int sizeOfEntry,
private int writeHeader(ChunkCompressionType compressionType, int totalDocs, int numDocsPerChunk, int sizeOfEntry,
int version) {
int numChunks = (totalDocs + numDocsPerChunk - 1) / numDocsPerChunk;
int headerSize = (7 * Integer.BYTES) + (numChunks * _headerEntryChunkOffsetSize);

_header = ByteBuffer.allocateDirect(headerSize);

int offset = 0;
_header.putInt(version);
Expand Down Expand Up @@ -154,11 +151,8 @@ private void writeHeader(ChunkCompressionType compressionType, int totalDocs, in
int dataHeaderStart = offset + Integer.BYTES;
_header.putInt(dataHeaderStart);
}
}

private static int headerSize(int totalDocs, int numDocsPerChunk, int headerEntryChunkOffsetSize) {
int numChunks = (totalDocs + numDocsPerChunk - 1) / numDocsPerChunk;
return (7 * Integer.BYTES) + (numChunks * headerEntryChunkOffsetSize);
return headerSize;
}

/**
Expand All @@ -172,15 +166,13 @@ private static int headerSize(int totalDocs, int numDocsPerChunk, int headerEntr
*
*/
protected void writeChunk() {
int sizeWritten;
int sizeToWrite;
_chunkBuffer.flip();

int maxCompressedSize = _chunkCompressor.maxCompressedSize(_chunkBuffer.limit());
// compress directly in to the mapped output rather keep a large buffer to compress into
try (PinotDataBuffer compressedBuffer = PinotDataBuffer.mapFile(_file, false, _dataOffset,
maxCompressedSize, ByteOrder.BIG_ENDIAN, "forward index chunk")) {
ByteBuffer view = compressedBuffer.toDirectByteBuffer(0, maxCompressedSize);
sizeWritten = _chunkCompressor.compress(_chunkBuffer, view);
try {
sizeToWrite = _chunkCompressor.compress(_chunkBuffer, _compressedBuffer);
_dataFile.write(_compressedBuffer, _dataOffset);
_compressedBuffer.clear();
} catch (IOException e) {
LOGGER.error("Exception caught while compressing/writing data chunk", e);
throw new RuntimeException(e);
Expand All @@ -192,7 +184,7 @@ protected void writeChunk() {
_header.putLong(_dataOffset);
}

_dataOffset += sizeWritten;
_dataOffset += sizeToWrite;

_chunkBuffer.clear();
}
Expand Down