Skip to content

Commit

Permalink
remove unnecessary projection
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
jackylk authored and chenliang613 committed Nov 26, 2016
1 parent e05c0d5 commit 51b2354
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import java.util
import scala.collection.JavaConverters._

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{expressions, TableIdentifier}
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _}
import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan}
Expand All @@ -39,7 +40,6 @@ import org.apache.spark.sql.types.IntegerType
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException


class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {

override def strategies: Seq[Strategy] = getStrategies
Expand Down Expand Up @@ -88,35 +88,49 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
// Check out any expressions are there in project list. if they are present then we need to
// decode them as well.

val projectSet = AttributeSet(projectList.flatMap(_.references))
val scan = CarbonScan(projectSet.toSeq,
relation.carbonRelation,
predicates)(sqlContext)
val filterSet = AttributeSet(predicates.flatMap(_.references))

val scan = CarbonScan(projectSet.toSeq, relation.carbonRelation, predicates)(sqlContext)
projectList.map {
case attr: AttributeReference =>
case Alias(attr: AttributeReference, _) =>
case others =>
others.references.map{f =>
others.references.map { f =>
val dictionary = relation.carbonRelation.metaData.dictionaryMap.get(f.name)
if (dictionary.isDefined && dictionary.get) {
scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference])
}
}
}
if (scan.attributesNeedToDecode.size() > 0) {
val decoder = getCarbonDecoder(logicalRelation,
sc,
tableName,
scan.attributesNeedToDecode.asScala.toSeq,
scan)
if (scan.unprocessedExprs.nonEmpty) {
val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
Project(projectList, filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder))
val scanWithDecoder =
if (scan.attributesNeedToDecode.size() > 0) {
val decoder = getCarbonDecoder(logicalRelation,
sc,
tableName,
scan.attributesNeedToDecode.asScala.toSeq,
scan)
if (scan.unprocessedExprs.nonEmpty) {
val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)
} else {
decoder
}
} else {
Project(projectList, decoder)
scan
}

if (projectList.map(_.toAttribute) == scan.attributesRaw &&
projectSet.size == projectList.size &&
filterSet.subsetOf(projectSet)) {
// copied from spark pruneFilterProjectRaw
// When it is possible to just use column pruning to get the right projection and
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan with no extra project.
scanWithDecoder
} else {
Project(projectList, scan)
Project(projectList, scanWithDecoder)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.carbondata.spark.testsuite.aggquery

import org.apache.spark.sql.Row
import org.apache.spark.sql.common.util.CarbonHiveContext._
import org.apache.spark.sql.common.util.QueryTest
import org.scalatest.BeforeAndAfterAll
Expand All @@ -37,6 +36,7 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
sql("DROP TABLE IF EXISTS alldatatypestableAGG")
sql("DROP TABLE IF EXISTS alldatatypescubeAGG_hive")
sql(
"CREATE TABLE alldatatypestableAGG (empno int, empname String, designation String, doj " +
"Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname " +
Expand All @@ -53,8 +53,6 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
"int,utilization int,salary int)row format delimited fields terminated by ','")
sql(
"LOAD DATA LOCAL INPATH './src/test/resources/datawithoutheader.csv' INTO TABLE alldatatypescubeAGG_hive")


}

test(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
clean
val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
.getCanonicalPath

sql("drop table if exists Carbon_automation_test")
sql("drop table if exists Carbon_automation_hive")
sql("drop table if exists Carbon_automation_test_hive")

sql("create table if not exists Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')");
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "MM-dd-yyyy HH:mm:ss")

sql("CREATE TABLE filterTimestampDataType (ID int, date Timestamp, country String, " +
"name String, phonetype String, serialname String, salary int) " +
"STORED BY 'org.apache.carbondata.format'"
Expand Down Expand Up @@ -129,7 +129,6 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
sql("load data local inpath './src/test/resources/big_int_Decimal.csv' into table big_int_basicc_Hive_1")
}


test("Is not null filter") {
checkAnswer(
sql("select id from filtertestTablesWithNull " + "where id is not null"),
Expand Down

0 comments on commit 51b2354

Please sign in to comment.