Skip to content

Commit

Permalink
Removed unnecessary plan from optimized plan.
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Mar 30, 2017
1 parent 1d93999 commit 53676bb
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.carbondata.core.datastore.chunk.impl;

import java.util.Arrays;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,8 @@ public CompressedMeasureChunkFileBasedReaderV1(final BlockletInfo blockletInfo,
@Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
throws IOException {
DataChunk dataChunk = measureColumnChunks.get(blockIndex);
ByteBuffer buffer =null;
buffer = fileReader
.readByteBuffer(filePath, dataChunk.getDataPageOffset(),
dataChunk.getDataPageLength());
ByteBuffer buffer = fileReader
.readByteBuffer(filePath, dataChunk.getDataPageOffset(), dataChunk.getDataPageLength());
MeasureRawColumnChunk rawColumnChunk = new MeasureRawColumnChunk(blockIndex, buffer, 0,
dataChunk.getDataPageLength(), this);
rawColumnChunk.setFileReader(fileReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ object CompareTest {
// do GC and sleep for some time before running next table
System.gc()
Thread.sleep(1000)
System.gc()
Thread.sleep(1000)
val carbonResult: Array[(Double, Int)] = runQueries(spark, carbonTableName("3"))
// check result by comparing output from parquet and carbon
parquetResult.zipWithIndex.foreach { case (result, index) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
/**
* The default config on whether columnarBatch should be offheap.
*/
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.OFF_HEAP;

private QueryModel queryModel;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ case class CarbonDictionaryCatalystDecoder(
// the output should be updated with converted datatype, it is need for limit+sort plan.
override val output: Seq[Attribute] =
CarbonDictionaryDecoder.convertOutput(child.output, relations, profile, aliasMap)

// Whether it is required to add to plan.
def requiredToAdd: Boolean = {
CarbonDictionaryDecoder.isRequiredToDecode(
CarbonDictionaryDecoder.getDictionaryColumnMapping(
child.output, relations, profile, aliasMap))
}
}

abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,8 @@ case class CarbonDictionaryDecoder(
child.outputPartitioning
}

val getDictionaryColumnIds: Array[(String, ColumnIdentifier, CarbonDimension)] = {
child.output.map { attribute =>
val attr = aliasMap.getOrElse(attribute, attribute)
val relation = relations.find(p => p.contains(attr))
if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, profile)) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension =
carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
if (carbonDimension != null &&
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!carbonDimension.isComplex) {
(carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
carbonDimension)
} else {
(null, null, null)
}
} else {
(null, null, null)
}
}.toArray
}
val getDictionaryColumnIds: Array[(String, ColumnIdentifier, CarbonDimension)] =
CarbonDictionaryDecoder.getDictionaryColumnMapping(child.output, relations, profile, aliasMap)

override def doExecute(): RDD[InternalRow] = {
attachTree(this, "execute") {
Expand All @@ -88,7 +68,7 @@ case class CarbonDictionaryDecoder(
(carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
}.toMap

if (isRequiredToDecode) {
if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
val dataTypes = child.output.map { attr => attr.dataType }
child.execute().mapPartitions { iter =>
val cacheProvider: CacheProvider = CacheProvider.getInstance
Expand Down Expand Up @@ -142,7 +122,7 @@ case class CarbonDictionaryDecoder(
(carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
}.toMap

if (isRequiredToDecode) {
if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath)
Expand Down Expand Up @@ -255,13 +235,6 @@ case class CarbonDictionaryDecoder(
child.asInstanceOf[CodegenSupport].produce(ctx, this)
}

private def isRequiredToDecode = {
getDictionaryColumnIds.find(p => p._1 != null) match {
case Some(value) => true
case _ => false
}
}

private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>
Expand Down Expand Up @@ -391,6 +364,39 @@ object CarbonDictionaryDecoder {
.toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
}
}

def getDictionaryColumnMapping(output: Seq[Attribute],
relations: Seq[CarbonDecoderRelation],
profile: CarbonProfile,
aliasMap: CarbonAliasDecoderRelation): Array[(String, ColumnIdentifier, CarbonDimension)] = {
output.map { attribute =>
val attr = aliasMap.getOrElse(attribute, attribute)
val relation = relations.find(p => p.contains(attr))
if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, profile)) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension =
carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
if (carbonDimension != null &&
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!carbonDimension.isComplex) {
(carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
carbonDimension)
} else {
(null, null, null)
}
} else {
(null, null, null)
}
}.toArray
}

def isRequiredToDecode(colIdents: Array[(String, ColumnIdentifier, CarbonDimension)]): Boolean = {
colIdents.find(p => p._1 != null) match {
case Some(value) => true
case _ => false
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,14 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
}
// Remove unnecessary decoders
val finalPlan = transFormedPlan transform {
case CarbonDictionaryCatalystDecoder(_, profile, _, false, child)
if profile.isInstanceOf[IncludeProfile] && profile.isEmpty =>
child
case decoder@ CarbonDictionaryCatalystDecoder(_, profile, _, isOuter, child) =>
if (!isOuter && profile.isInstanceOf[IncludeProfile] && profile.isEmpty) {
child
} else if (!decoder.requiredToAdd) {
child
} else {
decoder
}
}
finalPlan
}
Expand Down

0 comments on commit 53676bb

Please sign in to comment.