Skip to content

Commit

Permalink
refactor write step
Browse files Browse the repository at this point in the history
  • Loading branch information
jackylk authored and ravipesala committed Apr 5, 2017
1 parent bd044c2 commit 8cca0af
Show file tree
Hide file tree
Showing 18 changed files with 237 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ public final class CarbonCommonConstants {
/**
* DOUBLE_VALUE_MEASURE
*/
public static final char SUM_COUNT_VALUE_MEASURE = 'n';
public static final char DOUBLE_MEASURE = 'n';
/**
* BYTE_VALUE_MEASURE
*/
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,54 +17,22 @@

package org.apache.carbondata.core.datastore.impl.data.compressed;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.NodeMeasureDataStore;
import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
import org.apache.carbondata.core.util.ValueCompressionUtil;

public abstract class AbstractHeavyCompressedDoubleArrayDataStore
implements NodeMeasureDataStore //NodeMeasureDataStore<double[]>
{

private LogService LOGGER =
LogServiceFactory.getLogService(AbstractHeavyCompressedDoubleArrayDataStore.class.getName());

/**
* values.
*/
protected ValueCompressionHolder[] values;

/**
* compressionModel.
*/
protected WriterCompressModel compressionModel;

/**
* type
*/
private char[] type;

/**
* AbstractHeavyCompressedDoubleArrayDataStore constructor.
*
* @param compressionModel
*/
public AbstractHeavyCompressedDoubleArrayDataStore(WriterCompressModel compressionModel) {
this.compressionModel = compressionModel;
if (null != compressionModel) {
this.type = compressionModel.getType();
values =
new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
}
}
public class HeavyCompressedDoubleArrayDataStore {

// this method first invokes encoding routine to encode the data chunk,
// followed by invoking compression routine for preparing the data chunk for writing.
@Override public byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolder) {
public static byte[][] encodeMeasureDataArray(
WriterCompressModel compressionModel,
CarbonWriteDataHolder[] dataHolder) {
char[] type = compressionModel.getType();
ValueCompressionHolder[] values =
new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
byte[][] returnValue = new byte[values.length][];
for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) {
values[i] = compressionModel.getValueCompressionHolder()[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ public static DataChunk3 getDataChunk3(List<NodeHolder> nodeHolderList,

public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) {
ByteBuffer buffer = null;
if (valueEncoderMeta.getType() == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
if (valueEncoderMeta.getType() == CarbonCommonConstants.DOUBLE_MEASURE) {
buffer = ByteBuffer.allocate(
(CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+ 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,7 @@ public static ValueEncoderMeta deserializeEncoderMetaNew(byte[] encodeMeta) {
ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta();
valueEncoderMeta.setType(measureType);
switch (measureType) {
case CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE:
case CarbonCommonConstants.DOUBLE_MEASURE:
valueEncoderMeta.setMaxValue(buffer.getDouble());
valueEncoderMeta.setMinValue(buffer.getDouble());
valueEncoderMeta.setUniqueValue(buffer.getDouble());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,13 @@ public static char getAggType(DataType dataType) {
case LONG:
return CarbonCommonConstants.BIG_INT_MEASURE;
default:
return CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE;
return CarbonCommonConstants.DOUBLE_MEASURE;
}
}

// bytes of 0 in BigDecimal
public static final byte[] zeroBigDecimalBytes = bigDecimalToByte(BigDecimal.valueOf(0));

/**
* This method will convert a big decimal value to bytes
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private int addRow(Object[] row, long address) {
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
Object value = row[mesCount + dimensionSize];
if (null != value) {
if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
Double val = (Double) value;
CarbonUnsafe.unsafe.putDouble(baseObject, address + size, val);
size += 8;
Expand Down Expand Up @@ -183,7 +183,7 @@ public Object[] getRow(long address, Object[] rowToFill) {

for (int mesCount = 0; mesCount < measureSize; mesCount++) {
if (isSet(nullSetWords, mesCount)) {
if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
Double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
size += 8;
rowToFill[dimensionSize + mesCount] = val;
Expand Down Expand Up @@ -254,7 +254,7 @@ public void fillRow(long address, DataOutputStream stream) throws IOException {

for (int mesCount = 0; mesCount < measureSize; mesCount++) {
if (isSet(nullSetWords, mesCount)) {
if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
size += 8;
stream.writeDouble(val);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {

for (int mesCount = 0; mesCount < measureCount; mesCount++) {
if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
row[dimensionCount + mesCount] = stream.readDouble();
} else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
row[dimensionCount + mesCount] = stream.readLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByExcepti
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
Object value = row[mesCount + dimensionSize];
if (null != value) {
if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
Double val = (Double) value;
rowData.putDouble(size, val);
size += 8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByExcepti
if (aggType[counter] == CarbonCommonConstants.BYTE_VALUE_MEASURE) {
Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
stream.writeDouble(val);
} else if (aggType[counter] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
} else if (aggType[counter] == CarbonCommonConstants.DOUBLE_MEASURE) {
Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
stream.writeDouble(val);
} else if (aggType[counter] == CarbonCommonConstants.BIG_INT_MEASURE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ private void writeData(Object[][] recordHolderList, int entryCountLocal, File fi
Object value = row[mesCount + dimColCount];
if (null != value) {
stream.write((byte) 1);
if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
Double val = (Double) value;
stream.writeDouble(val);
} else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
// read measure values
for (int i = 0; i < this.measureCount; i++) {
if (stream.readByte() == 1) {
if (aggType[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
if (aggType[i] == CarbonCommonConstants.DOUBLE_MEASURE) {
measures[index++] = stream.readDouble();
} else if (aggType[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
measures[index++] = stream.readLong();
Expand Down
Loading

0 comments on commit 8cca0af

Please sign in to comment.