Skip to content

Commit

Permalink
fix bug in late decode optimizer and strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
QiangCai authored and jackylk committed Dec 2, 2016
1 parent 72900c5 commit 0776187
Show file tree
Hide file tree
Showing 7 changed files with 533 additions and 130 deletions.
4 changes: 2 additions & 2 deletions conf/dataload.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

#carbon store path
# you should change to the code path of your local machine
carbon.storelocation=/Users/jackylk/code/incubator-carbondata/examples/spark2/target/store
carbon.storelocation=/home/david/Documents/incubator-carbondata/examples/spark2/target/store

#true: use kettle to load data
#false: use new flow to load data
use_kettle=true

# you should change to the code path of your local machine
carbon.kettle.home=/Users/jackylk/code/incubator-carbondata/processing/carbonplugins
carbon.kettle.home=/home/david/Documents/incubator-carbondata/processing/carbonplugins

#csv delimiter character
delimiter=,
Expand Down
20 changes: 10 additions & 10 deletions examples/spark2/src/main/resources/data.csv
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
shortField,intField,bigintField,doubleField,stringField,timestampField
1, 10, 100, 48.4, spark, 2015/4/23
5, 17, 140, 43.4, spark, 2015/7/27
1, 11, 100, 44.4, flink, 2015/5/23
1, 10, 150, 43.4, spark, 2015/7/24
1, 10, 100, 47.4, spark, 2015/7/23
3, 14, 160, 43.4, hive, 2015/7/26
2, 10, 100, 43.4, impala, 2015/7/23
1, 10, 100, 43.4, spark, 2015/5/23
4, 16, 130, 42.4, impala, 2015/7/23
1, 10, 100, 43.4, spark, 2015/7/23
1,10,100,48.4,spark,2015/4/23
5,17,140,43.4,spark,2015/7/27
1,11,100,44.4,flink,2015/5/23
1,10,150,43.4,spark,2015/7/24
1,10,100,47.4,spark,2015/7/23
3,14,160,43.4,hive,2015/7/26
2,10,100,43.4,impala,2015/7/23
1,10,100,43.4,spark,2015/5/23
4,16,130,42.4,impala,2015/7/23
1,10,100,43.4,spark,2015/7/23
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object CarbonExample {

def main(args: Array[String]): Unit = {
// to run the example, plz change this path to your local machine path
val rootPath = "/Users/jackylk/code/incubator-carbondata"
val rootPath = "/home/david/Documents/incubator-carbondata"
val spark = SparkSession
.builder()
.master("local")
Expand All @@ -38,10 +38,10 @@ object CarbonExample {
spark.sparkContext.setLogLevel("WARN")

// Drop table
spark.sql("DROP TABLE IF EXISTS carbon_table")
spark.sql("DROP TABLE IF EXISTS csv_table")

// Create table
// spark.sql("DROP TABLE IF EXISTS carbon_table")
// spark.sql("DROP TABLE IF EXISTS csv_table")
//
// // Create table
spark.sql(
s"""
| CREATE TABLE carbon_table(
Expand Down Expand Up @@ -96,14 +96,26 @@ object CarbonExample {
FROM carbon_table
""").show

// spark.sql("""
// SELECT sum(intField), stringField
// FROM carbon_table
// GROUP BY stringField
// """).show
spark.sql("""
SELECT *
FROM carbon_table where length(stringField) = 5
""").show

spark.sql("""
SELECT sum(intField), stringField
FROM carbon_table
GROUP BY stringField
""").show

spark.sql(
"""
|select t1.*, t2.*
|from carbon_table t1, carbon_table t2
|where t1.stringField = t2.stringField
""".stripMargin).show

// Drop table
spark.sql("DROP TABLE IF EXISTS carbon_table")
spark.sql("DROP TABLE IF EXISTS csv_table")
// spark.sql("DROP TABLE IF EXISTS carbon_table")
// spark.sql("DROP TABLE IF EXISTS csv_table")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ case class CarbonDatasourceHadoopRelation(
paths: Array[String],
parameters: Map[String, String],
tableSchema: Option[StructType])
extends BaseRelation with PrunedFilteredScan {
extends BaseRelation {

lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
Expand All @@ -59,7 +59,7 @@ case class CarbonDatasourceHadoopRelation(

override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val job = new Job(new JobConf())
val conf = new Configuration(job.getConfiguration)
val filterExpression: Option[Expression] = filters.flatMap { filter =>
Expand All @@ -74,5 +74,5 @@ case class CarbonDatasourceHadoopRelation(
new CarbonScanRDD[Row](sqlContext.sparkContext, projection, filterExpression.orNull,
absIdentifier, carbonTable)
}

override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql

import org.apache.spark.TaskContext
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
Expand All @@ -33,8 +33,7 @@ import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, ColumnIdentif
import org.apache.carbondata.core.carbon.metadata.datatype.DataType
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.carbon.querystatistics._
import org.apache.carbondata.core.util.{CarbonTimeStatisticsFactory, DataTypeUtil}
import org.apache.carbondata.core.util.DataTypeUtil
import org.apache.carbondata.spark.CarbonAliasDecoderRelation

/**
Expand Down Expand Up @@ -220,3 +219,149 @@ case class CarbonDictionaryDecoder(
}

}




class CarbonDecoderRDD(relations: Seq[CarbonDecoderRelation],
profile: CarbonProfile,
aliasMap: CarbonAliasDecoderRelation,
prev: RDD[Row],
output: Seq[Attribute])
extends RDD[Row](prev) {

def canBeDecoded(attr: Attribute): Boolean = {
profile match {
case ip: IncludeProfile if ip.attributes.nonEmpty =>
ip.attributes
.exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
case ep: ExcludeProfile =>
!ep.attributes
.exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
case _ => true
}
}

def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
relation: CarbonRelation): types.DataType = {
carbonDimension.getDataType match {
case DataType.STRING => StringType
case DataType.SHORT => ShortType
case DataType.INT => IntegerType
case DataType.LONG => LongType
case DataType.DOUBLE => DoubleType
case DataType.BOOLEAN => BooleanType
case DataType.DECIMAL =>
val scale: Int = carbonDimension.getColumnSchema.getScale
val precision: Int = carbonDimension.getColumnSchema.getPrecision
if (scale == 0 && precision == 0) {
DecimalType(18, 2)
} else {
DecimalType(precision, scale)
}
case DataType.TIMESTAMP => TimestampType
case DataType.STRUCT =>
CarbonMetastoreTypes
.toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
case DataType.ARRAY =>
CarbonMetastoreTypes
.toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
}
}

val getDictionaryColumnIds = {
val dictIds: Array[(String, ColumnIdentifier, DataType)] = output.map { a =>
val attr = aliasMap.getOrElse(a, a)
val relation = relations.find(p => p.contains(attr))
if(relation.isDefined && canBeDecoded(attr)) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension =
carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
if (carbonDimension != null &&
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
(carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
carbonDimension.getDataType)
} else {
(null, null, null)
}
} else {
(null, null, null)
}

}.toArray
dictIds
}

override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
val storepath = CarbonEnv.get.carbonMetastore.storePath
val absoluteTableIdentifiers = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
(carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
}.toMap

val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath)
val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
forwardDictionaryCache)
val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
// add a task completion listener to clear dictionary that is a decisive factor for
// LRU eviction policy
val dictionaryTaskCleaner = TaskContext.get
dictionaryTaskCleaner.addTaskCompletionListener(context =>
dicts.foreach { dictionary =>
if (null != dictionary) {
dictionary.clear
}
}
)
val iter = firstParent[Row].iterator(split, context)
new Iterator[Row] {
var flag = true
var total = 0L

override final def hasNext: Boolean = iter.hasNext

override final def next(): Row = {
val startTime = System.currentTimeMillis()
val data = iter.next().asInstanceOf[GenericRow].toSeq.toArray
dictIndex.foreach { index =>
if ( data(index) != null) {
data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
.getDictionaryValueForKey(data(index).asInstanceOf[Int]),
getDictionaryColumnIds(index)._3)
}
}
new GenericRow(data)
}
}
}

private def isRequiredToDecode = {
getDictionaryColumnIds.find(p => p._1 != null) match {
case Some(value) => true
case _ => false
}
}

private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>
if (f._2 != null) {
try {
cache.get(new DictionaryColumnUniqueIdentifier(
atiMap(f._1).getCarbonTableIdentifier,
f._2, f._3))
} catch {
case _: Throwable => null
}
} else {
null
}
}
dicts
}

override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
}
Loading

0 comments on commit 0776187

Please sign in to comment.