diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java index 0998a0fbe38..5dbd6d1959a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java @@ -16,8 +16,6 @@ */ package org.apache.carbondata.core.datastore.chunk.impl; -import java.util.Arrays; - import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory; import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType; diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java index a1acce40cea..64e9b45dc48 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java @@ -82,10 +82,8 @@ public CompressedMeasureChunkFileBasedReaderV1(final BlockletInfo blockletInfo, @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex) throws IOException { DataChunk dataChunk = measureColumnChunks.get(blockIndex); - ByteBuffer buffer =null; - buffer = fileReader - .readByteBuffer(filePath, dataChunk.getDataPageOffset(), - dataChunk.getDataPageLength()); + ByteBuffer buffer = fileReader + .readByteBuffer(filePath, dataChunk.getDataPageOffset(), dataChunk.getDataPageLength()); MeasureRawColumnChunk rawColumnChunk = new MeasureRawColumnChunk(blockIndex, buffer, 0, dataChunk.getDataPageLength(), this); rawColumnChunk.setFileReader(fileReader); diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala index 72662304a87..41a7850b52a 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala @@ -307,6 +307,8 @@ object CompareTest { // do GC and sleep for some time before running next table System.gc() Thread.sleep(1000) + System.gc() + Thread.sleep(1000) val carbonResult: Array[(Double, Int)] = runQueries(spark, carbonTableName("3")) // check result by comparing output from parquet and carbon parquetResult.zipWithIndex.foreach { case (result, index) => diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index 9db5ace32e5..ffff956631f 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -73,7 +73,7 @@ class VectorizedCarbonRecordReader extends RecordReader { /** * The default config on whether columnarBatch should be offheap. */ - private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP; + private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.OFF_HEAP; private QueryModel queryModel; diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala index d94489be63f..ed3db3a954c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala @@ -35,6 +35,13 @@ case class CarbonDictionaryCatalystDecoder( // the output should be updated with converted datatype, it is need for limit+sort plan. override val output: Seq[Attribute] = CarbonDictionaryDecoder.convertOutput(child.output, relations, profile, aliasMap) + + // Whether it is required to add to plan. + def requiredToAdd: Boolean = { + CarbonDictionaryDecoder.isRequiredToDecode( + CarbonDictionaryDecoder.getDictionaryColumnMapping( + child.output, relations, profile, aliasMap)) + } } abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index 543da6f1ad7..05afb982500 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -57,28 +57,8 @@ case class CarbonDictionaryDecoder( child.outputPartitioning } - val getDictionaryColumnIds: Array[(String, ColumnIdentifier, CarbonDimension)] = { - child.output.map { attribute => - val attr = aliasMap.getOrElse(attribute, attribute) - val relation = relations.find(p => p.contains(attr)) - if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, profile)) { - val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable - val carbonDimension = - carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name) - if (carbonDimension != null && - carbonDimension.hasEncoding(Encoding.DICTIONARY) && - !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && - !carbonDimension.isComplex) { - (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier, - carbonDimension) - } else { - (null, null, null) - } - } else { - (null, null, null) - } - }.toArray - } + val getDictionaryColumnIds: Array[(String, ColumnIdentifier, CarbonDimension)] = + CarbonDictionaryDecoder.getDictionaryColumnMapping(child.output, relations, profile, aliasMap) override def doExecute(): RDD[InternalRow] = { attachTree(this, "execute") { @@ -88,7 +68,7 @@ case class CarbonDictionaryDecoder( (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) }.toMap - if (isRequiredToDecode) { + if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) { val dataTypes = child.output.map { attr => attr.dataType } child.execute().mapPartitions { iter => val cacheProvider: CacheProvider = CacheProvider.getInstance @@ -142,7 +122,7 @@ case class CarbonDictionaryDecoder( (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) }.toMap - if (isRequiredToDecode) { + if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) { val cacheProvider: CacheProvider = CacheProvider.getInstance val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath) @@ -255,13 +235,6 @@ case class CarbonDictionaryDecoder( child.asInstanceOf[CodegenSupport].produce(ctx, this) } - private def isRequiredToDecode = { - getDictionaryColumnIds.find(p => p._1 != null) match { - case Some(value) => true - case _ => false - } - } - private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier], cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = { val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f => @@ -391,6 +364,39 @@ object CarbonDictionaryDecoder { .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>") } } + + def getDictionaryColumnMapping(output: Seq[Attribute], + relations: Seq[CarbonDecoderRelation], + profile: CarbonProfile, + aliasMap: CarbonAliasDecoderRelation): Array[(String, ColumnIdentifier, CarbonDimension)] = { + output.map { attribute => + val attr = aliasMap.getOrElse(attribute, attribute) + val relation = relations.find(p => p.contains(attr)) + if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, profile)) { + val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable + val carbonDimension = + carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name) + if (carbonDimension != null && + carbonDimension.hasEncoding(Encoding.DICTIONARY) && + !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && + !carbonDimension.isComplex) { + (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier, + carbonDimension) + } else { + (null, null, null) + } + } else { + (null, null, null) + } + }.toArray + } + + def isRequiredToDecode(colIdents: Array[(String, ColumnIdentifier, CarbonDimension)]): Boolean = { + colIdents.find(p => p._1 != null) match { + case Some(value) => true + case _ => false + } + } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index 36478b4b124..a0dfde00aa5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -609,9 +609,14 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { } // Remove unnecessary decoders val finalPlan = transFormedPlan transform { - case CarbonDictionaryCatalystDecoder(_, profile, _, false, child) - if profile.isInstanceOf[IncludeProfile] && profile.isEmpty => - child + case decoder@ CarbonDictionaryCatalystDecoder(_, profile, _, isOuter, child) => + if (!isOuter && profile.isInstanceOf[IncludeProfile] && profile.isEmpty) { + child + } else if (!decoder.requiredToAdd) { + child + } else { + decoder + } } finalPlan }