Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,15 @@
@NotThreadSafe
public class VarByteChunkSVForwardIndexWriterV4 implements VarByteChunkWriter {

private static final Logger LOGGER = LoggerFactory.getLogger(VarByteChunkSVForwardIndexWriterV4.class);

public static final int VERSION = 4;

private static final Logger LOGGER = LoggerFactory.getLogger(VarByteChunkSVForwardIndexWriterV4.class);
private static final String DATA_BUFFER_SUFFIX = ".buf";

private final File _dataBuffer;
private final RandomAccessFile _output;
private final FileChannel _dataChannel;
private final ByteBuffer _chunkBuffer;
private final ByteBuffer _compressionBuffer;
private final ChunkCompressor _chunkCompressor;

private int _docIdOffset = 0;
Expand All @@ -69,6 +68,8 @@ public VarByteChunkSVForwardIndexWriterV4(File file, ChunkCompressionType compre
_dataChannel = new RandomAccessFile(_dataBuffer, "rw").getChannel();
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType, true);
_chunkBuffer = ByteBuffer.allocateDirect(chunkSize).order(ByteOrder.LITTLE_ENDIAN);
_compressionBuffer = ByteBuffer.allocateDirect(_chunkCompressor.maxCompressedSize(chunkSize))
.order(ByteOrder.LITTLE_ENDIAN);
// reserve space for numDocs
_chunkBuffer.position(Integer.BYTES);
writeHeader(_chunkCompressor.compressionType(), chunkSize);
Expand Down Expand Up @@ -179,12 +180,24 @@ private void writeChunk() {
}

private void write(ByteBuffer buffer, boolean huge) {
int maxCompressedSize = _chunkCompressor.maxCompressedSize(buffer.limit());
ByteBuffer target = null;
ByteBuffer mapped = null;
final int compressedSize;
try {
target = _dataChannel.map(FileChannel.MapMode.READ_WRITE, _chunkOffset, maxCompressedSize)
.order(ByteOrder.LITTLE_ENDIAN);
int compressedSize = _chunkCompressor.compress(buffer, target);
if (huge) {
// the compression buffer isn't guaranteed to be large enough for huge chunks,
// so use mmap and compress directly into the file if this ever happens
int maxCompressedSize = _chunkCompressor.maxCompressedSize(buffer.limit());
mapped = _dataChannel.map(FileChannel.MapMode.READ_WRITE, _chunkOffset, maxCompressedSize)
.order(ByteOrder.LITTLE_ENDIAN);
compressedSize = _chunkCompressor.compress(buffer, mapped);
_dataChannel.position(_chunkOffset + compressedSize);
} else {
compressedSize = _chunkCompressor.compress(buffer, _compressionBuffer);
int written = 0;
while (written < compressedSize) {
written += _dataChannel.write(_compressionBuffer);
}
}
// reverse bytes here because the file writes BE and we want to read the metadata LE
_output.writeInt(Integer.reverseBytes(_docIdOffset | (huge ? 0x80000000 : 0)));
_output.writeInt(Integer.reverseBytes((int) (_chunkOffset & 0xFFFFFFFFL)));
Expand All @@ -195,7 +208,11 @@ private void write(ByteBuffer buffer, boolean huge) {
LOGGER.error("Exception caught while compressing/writing data chunk", e);
throw new RuntimeException(e);
} finally {
CleanerUtil.cleanQuietly(target);
if (mapped != null) {
CleanerUtil.cleanQuietly(mapped);
} else {
_compressionBuffer.clear();
}
}
}

Expand Down Expand Up @@ -223,6 +240,7 @@ public void close()
}
_dataChannel.close();
_output.close();
CleanerUtil.cleanQuietly(_compressionBuffer);
CleanerUtil.cleanQuietly(_chunkBuffer);
FileUtils.deleteQuietly(_dataBuffer);
}
Expand Down