Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-861] Improvements in query #709

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ public interface FileHolder {
* and length(number of bytes) need to read
*
* @param filePath fully qualified file path
* @param byteBuffer
* @param offset reading start position,
* @param length number of bytes to be read
* @return ByteBuffer
* @throws IOException
*/
void readByteBuffer(String filePath, ByteBuffer byteBuffer, long offset, int length)
ByteBuffer readByteBuffer(String filePath, long offset, int length)
throws IOException;
/**
* This method will be used to read the byte array from file based on offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.carbondata.core.datastore.chunk.impl;

import java.util.Arrays;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType;
Expand Down Expand Up @@ -109,14 +107,9 @@ public VariableLengthDimensionDataChunk(byte[] dataChunks, int[] invertedIndex,
int vectorOffset = columnVectorInfo.vectorOffset;
int len = offset + columnVectorInfo.size;
for (int i = offset; i < len; i++) {
byte[] value = dataChunkStore.getRow(i);
// Considering only String case now as we support only
// string in no dictionary case at present.
if (value == null || Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) {
vector.putNull(vectorOffset++);
} else {
vector.putBytes(vectorOffset++, value);
}
dataChunkStore.fillRow(i, vector, vectorOffset++);
}
return column + 1;
}
Expand All @@ -138,14 +131,9 @@ public VariableLengthDimensionDataChunk(byte[] dataChunks, int[] invertedIndex,
int vectorOffset = columnVectorInfo.vectorOffset;
int len = offset + columnVectorInfo.size;
for (int i = offset; i < len; i++) {
byte[] value = dataChunkStore.getRow(rowMapping[i]);
// Considering only String case now as we support only
// string in no dictionary case at present.
if (value == null || Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) {
vector.putNull(vectorOffset++);
} else {
vector.putBytes(vectorOffset++, value);
}
dataChunkStore.fillRow(rowMapping[i], vector, vectorOffset++);
}
return column + 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,10 @@ public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo
@Override public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
int blockletIndex) throws IOException {
DataChunk dataChunk = dimensionColumnChunk.get(blockletIndex);
ByteBuffer buffer =
ByteBuffer.allocateDirect(dataChunk.getDataPageLength());
ByteBuffer buffer = null;
synchronized (fileReader) {
fileReader.readByteBuffer(filePath, buffer,
dataChunk.getDataPageOffset(),
dataChunk.getDataPageLength());
buffer = fileReader
.readByteBuffer(filePath, dataChunk.getDataPageOffset(), dataChunk.getDataPageLength());
}
DimensionRawColumnChunk rawColumnChunk = new DimensionRawColumnChunk(blockletIndex, buffer, 0,
dataChunk.getDataPageLength(), this);
Expand All @@ -110,10 +108,8 @@ public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo
FileHolder fileReader = dimensionRawColumnChunk.getFileReader();

ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
rawData.position(dimensionRawColumnChunk.getOffSet());
byte[] data = new byte[dimensionRawColumnChunk.getLength()];
rawData.get(data);
dataPage = COMPRESSOR.unCompressByte(data);
dataPage = COMPRESSOR.unCompressByte(rawData.array(), dimensionRawColumnChunk.getOffSet(),
dimensionRawColumnChunk.getLength());

// if row id block is present then read the row id chunk and uncompress it
DataChunk dataChunk = dimensionColumnChunk.get(blockIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader, int
long currentDimensionOffset = dimensionChunksOffset.get(blockletIndex);
length = (int) (dimensionChunksOffset.get(blockletIndex + 1) - currentDimensionOffset);
}
ByteBuffer buffer = ByteBuffer.allocateDirect(length);
ByteBuffer buffer = null;
synchronized (fileReader) {
fileReader.readByteBuffer(filePath, buffer, dimensionChunksOffset.get(blockletIndex), length);
buffer =
fileReader.readByteBuffer(filePath, dimensionChunksOffset.get(blockletIndex), length);
}
DimensionRawColumnChunk rawColumnChunk =
new DimensionRawColumnChunk(blockletIndex, buffer, 0, length, this);
Expand All @@ -92,10 +93,9 @@ public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader, int
protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
long currentDimensionOffset = dimensionChunksOffset.get(startColumnBlockletIndex);
ByteBuffer buffer = ByteBuffer.allocateDirect(
(int) (dimensionChunksOffset.get(endColumnBlockletIndex + 1) - currentDimensionOffset));
ByteBuffer buffer = null;
synchronized (fileReader) {
fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset,
buffer = fileReader.readByteBuffer(filePath, currentDimensionOffset,
(int) (dimensionChunksOffset.get(endColumnBlockletIndex + 1) - currentDimensionOffset));
}
DimensionRawColumnChunk[] dataChunks =
Expand Down Expand Up @@ -132,8 +132,7 @@ public DimensionColumnDataChunk convertToDimensionChunk(
dimensionColumnChunk.data_page_length + dimensionColumnChunk.rle_page_length
+ dimensionColumnChunk.rowid_page_length;
synchronized (dimensionRawColumnChunk.getFileReader()) {
rawData = ByteBuffer.allocateDirect(totalDimensionDataLength);
dimensionRawColumnChunk.getFileReader().readByteBuffer(filePath, rawData,
rawData = dimensionRawColumnChunk.getFileReader().readByteBuffer(filePath,
dimensionChunksOffset.get(blockIndex) + dimensionChunksLength.get(blockIndex),
totalDimensionDataLength);
}
Expand All @@ -143,11 +142,9 @@ public DimensionColumnDataChunk convertToDimensionChunk(
copySourcePoint += dimensionChunksLength.get(blockIndex);
}

byte[] data = new byte[dimensionColumnChunk.data_page_length];
rawData.position(copySourcePoint);
rawData.get(data);
// first read the data and uncompressed it
dataPage = COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
dataPage = COMPRESSOR
.unCompressByte(rawData.array(), copySourcePoint, dimensionColumnChunk.data_page_length);
copySourcePoint += dimensionColumnChunk.data_page_length;
// if row id block is present then read the row id chunk and uncompress it
if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,10 @@ public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
} else {
length = (int) (dimensionChunksOffset.get(blockletColumnIndex + 1) - currentDimensionOffset);
}
// allocate the buffer
ByteBuffer buffer = ByteBuffer.allocateDirect(length);
ByteBuffer buffer = null;
// read the data from carbon data file
synchronized (fileReader) {
fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset, length);
buffer = fileReader.readByteBuffer(filePath, currentDimensionOffset, length);
}
// get the data chunk which will have all the details about the data pages
DataChunk3 dataChunk = CarbonUtil.readDataChunk3(buffer, 0, length);
Expand Down Expand Up @@ -148,11 +147,10 @@ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fil
// column we can subtract the offset of start column offset with
// end column+1 offset and get the total length.
long currentDimensionOffset = dimensionChunksOffset.get(startBlockletColumnIndex);
ByteBuffer buffer = ByteBuffer.allocateDirect(
(int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset));
ByteBuffer buffer = null;
// read the data from carbon data file
synchronized (fileReader) {
fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset,
buffer = fileReader.readByteBuffer(filePath, currentDimensionOffset,
(int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset));
}
// create raw chunk for each dimension column
Expand Down Expand Up @@ -218,11 +216,9 @@ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fil
// data chunk length + page offset
int copySourcePoint = dimensionRawColumnChunk.getOffSet() + dimensionChunksLength
.get(dimensionRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
byte[] data = new byte[dimensionColumnChunk.data_page_length];
rawData.position(copySourcePoint);
rawData.get(data);
// first read the data and uncompressed it
dataPage = COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
dataPage = COMPRESSOR
.unCompressByte(rawData.array(), copySourcePoint, dimensionColumnChunk.data_page_length);
copySourcePoint += dimensionColumnChunk.data_page_length;
// if row id block is present then read the row id chunk and uncompress it
if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,8 @@ public CompressedMeasureChunkFileBasedReaderV1(final BlockletInfo blockletInfo,
@Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
throws IOException {
DataChunk dataChunk = measureColumnChunks.get(blockIndex);
ByteBuffer buffer =
ByteBuffer.allocateDirect(dataChunk.getDataPageLength());
fileReader
.readByteBuffer(filePath, buffer, dataChunk.getDataPageOffset(),
dataChunk.getDataPageLength());
ByteBuffer buffer = fileReader
.readByteBuffer(filePath, dataChunk.getDataPageOffset(), dataChunk.getDataPageLength());
MeasureRawColumnChunk rawColumnChunk = new MeasureRawColumnChunk(blockIndex, buffer, 0,
dataChunk.getDataPageLength(), this);
rawColumnChunk.setFileReader(fileReader);
Expand All @@ -104,15 +101,12 @@ public MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measur
ReaderCompressModel compressModel = ValueCompressionUtil.getReaderCompressModel(meta);

ValueCompressionHolder values = compressModel.getValueCompressionHolder();
byte[] dataPage = new byte[measureRawColumnChunk.getLength()];
ByteBuffer rawData = measureRawColumnChunk.getRawData();
rawData.position(measureRawColumnChunk.getOffSet());
rawData.get(dataPage);

// unCompress data
values.uncompress(compressModel.getConvertedDataType(), dataPage, 0,
dataChunk.getDataPageLength(), compressModel.getMantissa(),
compressModel.getMaxValue(), numberOfRows);
values.uncompress(compressModel.getConvertedDataType(), rawData.array(),
measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength(),
compressModel.getMantissa(), compressModel.getMaxValue(), numberOfRows);

CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
long currentMeasureOffset = measureColumnChunkOffsets.get(blockIndex);
dataLength = (int) (measureColumnChunkOffsets.get(blockIndex + 1) - currentMeasureOffset);
}
ByteBuffer buffer = ByteBuffer.allocateDirect(dataLength);
ByteBuffer buffer = null;
synchronized (fileReader) {
fileReader
.readByteBuffer(filePath, buffer, measureColumnChunkOffsets.get(blockIndex), dataLength);
buffer = fileReader
.readByteBuffer(filePath, measureColumnChunkOffsets.get(blockIndex), dataLength);
}
MeasureRawColumnChunk rawColumnChunk =
new MeasureRawColumnChunk(blockIndex, buffer, 0, dataLength, this);
Expand All @@ -85,10 +85,9 @@ public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnBlockletIndex);
ByteBuffer buffer = ByteBuffer.allocateDirect(
(int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
ByteBuffer buffer = null;
synchronized (fileReader) {
fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset,
buffer = fileReader.readByteBuffer(filePath, currentMeasureOffset,
(int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
}
MeasureRawColumnChunk[] dataChunks =
Expand Down Expand Up @@ -121,8 +120,7 @@ public MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measur
measureColumnChunk =
CarbonUtil.readDataChunk(rawData, copyPoint, measureColumnChunkLength.get(blockIndex));
synchronized (measureRawColumnChunk.getFileReader()) {
rawData = ByteBuffer.allocateDirect(measureColumnChunk.data_page_length);
measureRawColumnChunk.getFileReader().readByteBuffer(filePath, rawData,
rawData = measureRawColumnChunk.getFileReader().readByteBuffer(filePath,
measureColumnChunkOffsets.get(blockIndex) + measureColumnChunkLength.get(blockIndex),
measureColumnChunk.data_page_length);
}
Expand All @@ -139,11 +137,8 @@ public MeasureColumnDataChunk convertToMeasureChunk(MeasureRawColumnChunk measur
WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);

ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0];
byte[] data = new byte[measureColumnChunk.data_page_length];
rawData.position(copyPoint);
rawData.get(data);
// uncompress
values.uncompress(compressionModel.getConvertedDataType()[0], data, 0,
values.uncompress(compressionModel.getConvertedDataType()[0], rawData.array(), copyPoint,
measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
compressionModel.getMaxValue()[0], numberOfRows);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,33 +72,33 @@ public CompressedMeasureChunkFileBasedReaderV3(BlockletInfo blockletInfo, String
* @param blockIndex blocklet index of the column in carbon data file
* @return measure raw chunk
*/
@Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
throws IOException {
@Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader,
int blockletColumnIndex) throws IOException {
int dataLength = 0;
// to calculate the length of the data to be read
// column other than last column we can subtract the offset of current column with
// next column and get the total length.
// but for last column we need to use lastDimensionOffset which is the end position
// of the last dimension, we can subtract current dimension offset from lastDimesionOffset
if (measureColumnChunkOffsets.size() - 1 == blockIndex) {
dataLength = (int) (measureOffsets - measureColumnChunkOffsets.get(blockIndex));
if (measureColumnChunkOffsets.size() - 1 == blockletColumnIndex) {
dataLength = (int) (measureOffsets - measureColumnChunkOffsets.get(blockletColumnIndex));
} else {
dataLength = (int) (measureColumnChunkOffsets.get(blockIndex + 1) - measureColumnChunkOffsets
.get(blockIndex));
dataLength =
(int) (measureColumnChunkOffsets.get(blockletColumnIndex + 1) - measureColumnChunkOffsets
.get(blockletColumnIndex));
}
// allocate the buffer
ByteBuffer buffer = ByteBuffer.allocateDirect(dataLength);
ByteBuffer buffer = null;
// read the data from carbon data file
synchronized (fileReader) {
fileReader
.readByteBuffer(filePath, buffer, measureColumnChunkOffsets.get(blockIndex), dataLength);
buffer = fileReader
.readByteBuffer(filePath, measureColumnChunkOffsets.get(blockletColumnIndex), dataLength);
}
// get the data chunk which will have all the details about the data pages
DataChunk3 dataChunk =
CarbonUtil.readDataChunk3(buffer, 0, measureColumnChunkLength.get(blockIndex));
CarbonUtil.readDataChunk3(buffer, 0, measureColumnChunkLength.get(blockletColumnIndex));
// creating a raw chunks instance and filling all the details
MeasureRawColumnChunk rawColumnChunk =
new MeasureRawColumnChunk(blockIndex, buffer, 0, dataLength, this);
new MeasureRawColumnChunk(blockletColumnIndex, buffer, 0, dataLength, this);
int numberOfPages = dataChunk.getPage_length().size();
byte[][] maxValueOfEachPage = new byte[numberOfPages][];
byte[][] minValueOfEachPage = new byte[numberOfPages][];
Expand Down Expand Up @@ -148,11 +148,10 @@ protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileRea
// column we can subtract the offset of start column offset with
// end column+1 offset and get the total length.
long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnBlockletIndex);
ByteBuffer buffer = ByteBuffer.allocateDirect(
(int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
ByteBuffer buffer = null;
// read the data from carbon data file
synchronized (fileReader) {
fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset,
buffer = fileReader.readByteBuffer(filePath, currentMeasureOffset,
(int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
}
// create raw chunk for each measure column
Expand Down Expand Up @@ -224,11 +223,8 @@ protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileRea
WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0];
// uncompress
byte[] data = new byte[measureColumnChunk.data_page_length];
ByteBuffer rawData = measureRawColumnChunk.getRawData();
rawData.position(copyPoint);
rawData.get(data);
values.uncompress(compressionModel.getConvertedDataType()[0], data, 0,
values.uncompress(compressionModel.getConvertedDataType()[0], rawData.array(), copyPoint,
measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
compressionModel.getMaxValue()[0], measureRawColumnChunk.getRowCount()[pageNumber]);
CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.carbondata.core.datastore.chunk.store;

import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;

/**
* Interface responsibility is to store dimension data in memory.
* storage can be on heap or offheap.
Expand All @@ -41,6 +43,13 @@ public interface DimensionDataChunkStore {
*/
byte[] getRow(int rowId);

/**
* Below method will be used to fill the row to vector
* based on row id passed
*
*/
void fillRow(int rowId, CarbonColumnVector vector, int vectorRow);

/**
* Below method will be used to fill the row values to buffer array
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.carbondata.core.datastore.chunk.store.impl.safe;

import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;

/**
* Responsibility is to store dimension data
Expand Down Expand Up @@ -120,4 +121,7 @@ public SafeAbsractDimensionDataChunkStore(boolean isInvertedIdex) {
throw new UnsupportedOperationException("Operation not supported");
}

@Override public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
throw new UnsupportedOperationException("Operation not supported");
}
}
Loading