Skip to content

Commit

Permalink
Merge branch 'ESPARK-102' into 'spark_2.1'
Browse files Browse the repository at this point in the history
[ESPARK-102] 解决hive分区数据大小估算不准的问题

解决hive分区数据大小估算不准的问题  
resolve apache#102 

See merge request !56
  • Loading branch information
cenyuhai committed Sep 12, 2017
2 parents 4102f20 + dcfc803 commit f0eb740
Showing 1 changed file with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import java.io.IOException
import scala.util.control.Breaks.{break, breakable}

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSet, PredicateHelper, Rand}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable

case class DeterminePartitionedTableStats(sparkSession: SparkSession)
extends Rule[LogicalPlan] with PredicateHelper {
Expand All @@ -39,7 +38,6 @@ case class DeterminePartitionedTableStats(sparkSession: SparkSession)
case filter@Filter(condition, relation: MetastoreRelation)
if DDLUtils.isHiveTable(relation.catalogTable) &&
!relation.catalogTable.partitionColumnNames.isEmpty &&
sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled &&
sparkSession.sessionState.conf.metastorePartitionPruning =>
val predicates = splitConjunctivePredicates(condition)
val partitionSet = AttributeSet(relation.partitionKeys)
Expand All @@ -56,13 +54,30 @@ case class DeterminePartitionedTableStats(sparkSession: SparkSession)
var sizeInBytes = 0L
var hasError = false
val partitions = prunedPartitions.filter(p => p.storage.locationUri.isDefined)
.map(p => new Path(p.storage.locationUri.get))
val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
var i = 0
breakable {
partitions.foreach { partition =>
try {
val fs = partition.getFileSystem(hadoopConf)
sizeInBytes += fs.getContentSummary(partition).getLength
i = i + 1
val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE)
val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE)
if ((rawDataSize.isDefined && rawDataSize.get.toLong > 0) ||
(totalSize.isDefined && totalSize.get.toLong > 0)) {
if (rawDataSize.isDefined && rawDataSize.get.toLong > 0) {
sizeInBytes += rawDataSize.get.toLong
} else {
sizeInBytes += totalSize.get.toLong
}
} else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
val path = new Path(partition.storage.locationUri.get)
val fs = path.getFileSystem(hadoopConf)
sizeInBytes += fs.getContentSummary(path).getLength
} else {
hasError = true
sizeInBytes = 0
break()
}
if (sizeInBytes > threshold) {
break()
}
Expand All @@ -73,7 +88,7 @@ case class DeterminePartitionedTableStats(sparkSession: SparkSession)
}
}
}

sizeInBytes = sizeInBytes * i / partitions.size
if (hasError && sizeInBytes == 0) {
sizeInBytes = sparkSession.sessionState.conf.defaultSizeInBytes
}
Expand All @@ -93,7 +108,7 @@ case class MergeSmallFiles(sparkSession: SparkSession) extends Rule[LogicalPlan]
case InsertIntoTable(table: MetastoreRelation, partition,
child, overwrite, ifNotExists) if !child.isInstanceOf[Sort] &&
!child.children.exists(a => a.isInstanceOf[RepartitionByExpression] &&
!a.isInstanceOf[Repartition]) =>
!a.isInstanceOf[Repartition]) && !table.databaseName.contains("temp") =>
val rand = Alias(new Rand(), "_nondeterministic")()
val newProjected = Project(child.output :+ rand, child)
val mergeFileStage = RepartitionByExpression(Seq(rand.toAttribute), newProjected, None)
Expand All @@ -102,7 +117,7 @@ case class MergeSmallFiles(sparkSession: SparkSession) extends Rule[LogicalPlan]
case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" &&
!query.isInstanceOf[Sort] &&
!query.children.exists(a => a.isInstanceOf[RepartitionByExpression]
&& !a.isInstanceOf[Repartition]) =>
&& !a.isInstanceOf[Repartition]) && !tableDesc.database.contains("temp") =>
CreateTable(tableDesc, mode, Some(RepartitionByExpression(Seq(new Rand()), query, None)))
}
}
Expand Down

0 comments on commit f0eb740

Please sign in to comment.