Skip to content

Spark 24914 working #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.spark.sql.catalyst.catalog

import java.math.RoundingMode.UP
import java.net.URI
import java.time.ZoneOffset
import java.util.Date

import scala.collection.mutable
import scala.util.control.NonFatal

import com.google.common.math.DoubleMath.roundToBigInteger

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
Expand Down Expand Up @@ -378,31 +381,37 @@ object CatalogTable {
*/
case class CatalogStatistics(
sizeInBytes: BigInt,
deserFactor: Option[Int] = None,
rowCount: Option[BigInt] = None,
colStats: Map[String, CatalogColumnStat] = Map.empty) {

/**
* Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
* on column names.
*/
def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean, deserFactorDistortion: Double)
: Statistics = {
if (cboEnabled && rowCount.isDefined) {
val attrStats = AttributeMap(planOutput
.flatMap(a => colStats.get(a.name).map(a -> _.toPlanStat(a.name, a.dataType))))
// Estimate size as number of rows * row size.
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
} else {
// When CBO is disabled or the table doesn't have other statistics, we apply the size-only
// estimation strategy and only propagate sizeInBytes in statistics.
Statistics(sizeInBytes = sizeInBytes)
// When CBO is disabled or the table doesn't have other statistics, we apply the file size
// based estimation strategy and only propagate sizeInBytes in statistics.
val size = deserFactor.map { factor =>
BigInt(roundToBigInteger(sizeInBytes.doubleValue * deserFactorDistortion * factor, UP))
}.getOrElse(sizeInBytes)
Statistics(sizeInBytes = size)
}
}

/** Readable string representation for the CatalogStatistics. */
def simpleString: String = {
val rowCountString = if (rowCount.isDefined) s", ${rowCount.get} rows" else ""
s"$sizeInBytes bytes$rowCountString"
val rowCountString = rowCount.map(c => s", $c rows").getOrElse("")
val deserFactorString = deserFactor.map(f => s", deserfactor=$f ").getOrElse("")
s"$sizeInBytes bytes$rowCountString$deserFactorString"
}
}

Expand Down Expand Up @@ -631,7 +640,7 @@ case class HiveTableRelation(
)

override def computeStats(): Statistics = {
tableMeta.stats.map(_.toPlanStats(output, conf.cboEnabled))
tableMeta.stats.map(_.toPlanStats(output, conf.cboEnabled, conf.deserFactorDistortion))
.orElse(tableStats)
.getOrElse {
throw new IllegalStateException("table stats must be specified.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,29 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val DESERIALIZATION_FACTOR_CALC_ENABLED =
buildConf("spark.sql.statistics.deserFactor.calc.enabled")
.doc("Enables the calculation of the deserialization factor as a table statistic. " +
"This factor is calculated for columnar storage formats as a ratio of actual data size " +
"to raw file size. Spark uses this ratio to scale up the estimated size, which leads to " +
"better estimate of in-memory data size and improves the query optimization (i.e., join " +
"strategy). Spark stores a ratio, rather than the data size, so that the table can grow " +
"without having to recompute statistics. In case of partitioned table the maximum of " +
"these factors is taken. When the factor is already calculated (and stored in the meta " +
"store) but the calculation is disabled in a subsequent ANALYZE TABLE (by setting this " +
"config to false) then the old factor will be applied as this factor can be removed only " +
"by TRUNCATE or a DROP table.")
.booleanConf
.createWithDefault(false)

val DESERIALIZATION_FACTOR_EXTRA_DISTORTION =
buildConf("spark.sql.statistics.deserFactor.distortion")
.doc("Distortion value used as an extra multiplier at the application of the " +
"deserialization factor making one capable to modify the computed table size even after " +
"the deserialization factor is calculated and stored in the meta store.")
.doubleConf
.createWithDefault(1.0)

val CBO_ENABLED =
buildConf("spark.sql.cbo.enabled")
.doc("Enables CBO for estimation of plan statistics when set true.")
Expand Down Expand Up @@ -2360,6 +2383,10 @@ class SQLConf extends Serializable with Logging {

def autoSizeUpdateEnabled: Boolean = getConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED)

def deserFactorStatCalcEnabled: Boolean = getConf(SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED)

def deserFactorDistortion: Double = getConf(SQLConf.DESERIALIZATION_FACTOR_EXTRA_DISTORTION)

def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)

def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ case class AnalyzeColumnCommand(
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}
} else {
val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
val sizeWithDeserFactor = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
val relation = sparkSession.table(tableIdent).logicalPlan
val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation, columnNames, allColumns)

Expand All @@ -126,7 +126,8 @@ case class AnalyzeColumnCommand(

// We also update table-level stats in order to keep them consistent with column-level stats.
val statistics = CatalogStatistics(
sizeInBytes = sizeInBytes,
sizeInBytes = sizeWithDeserFactor.sizeInBytes,
deserFactor = sizeWithDeserFactor.deserFactor,
rowCount = Some(rowCount),
// Newly computed column stats should override the existing ones.
colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColCatalogStats)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ case class AnalyzePartitionCommand(
// Update the metastore if newly computed statistics are different from those
// recorded in the metastore.
val newPartitions = partitions.flatMap { p =>
val newTotalSize = CommandUtils.calculateLocationSize(
sessionState, tableMeta.identifier, p.storage.locationUri)
val totalSizeWithDeserFact = CommandUtils.calculateLocationSize(
sparkSession.sessionState, tableMeta.identifier, p.storage.locationUri)
val newRowCount = rowCounts.get(p.spec)
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
val newStats =
CommandUtils.compareAndGetNewStats(tableMeta.stats, totalSizeWithDeserFact, newRowCount)
newStats.map(_ => p.copy(stats = newStats))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,27 @@ import java.net.URI
import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex}
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.types._

case class SizeInBytesWithDeserFactor(
sizeInBytes: BigInt,
deserFactor: Option[Int])

object CommandUtils extends Logging {

Expand All @@ -45,42 +51,93 @@ object CommandUtils extends Logging {
val catalog = sparkSession.sessionState.catalog
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
val newTable = catalog.getTableMetadata(table.identifier)
val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
val newStats = CatalogStatistics(sizeInBytes = newSize)
val oldDeserFactor = newTable.stats.flatMap(_.deserFactor)
val newSizeWithDeserFactor = CommandUtils.calculateTotalSize(sparkSession, newTable)
val newStats = CatalogStatistics(
sizeInBytes = newSizeWithDeserFactor.sizeInBytes,
deserFactor = newSizeWithDeserFactor.deserFactor.orElse(oldDeserFactor))
catalog.alterTableStats(table.identifier, Some(newStats))
} else if (table.stats.nonEmpty) {
catalog.alterTableStats(table.identifier, None)
}
}

def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = {
def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable)
: SizeInBytesWithDeserFactor = {
val sessionState = spark.sessionState
if (catalogTable.partitionColumnNames.isEmpty) {
calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri)
calculateLocationSize(
spark.sessionState,
catalogTable.identifier,
catalogTable.storage.locationUri)
} else {
// Calculate table size as a sum of the visible partitions. See SPARK-21079
val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
if (spark.sessionState.conf.parallelFileListingInStatsComputation) {
val paths = partitions.map(x => new Path(x.storage.locationUri.get))
val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
val pathFilter = new PathFilter with Serializable {
override def accept(path: Path): Boolean = isDataPath(path, stagingDir)
val sizeWithDeserFactorsForPartitions =
if (spark.sessionState.conf.parallelFileListingInStatsComputation) {
val paths = partitions.map(x => new Path(x.storage.locationUri.get))
val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
val pathFilter = new PathFilter with Serializable {
override def accept(path: Path): Boolean = isDataPath(path, stagingDir)
}
val calcDeserFactEnabled = sessionState.conf.deserFactorStatCalcEnabled
val hadoopConf = sessionState.newHadoopConf()
val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
paths, hadoopConf, pathFilter, spark, areRootPaths = true)
fileStatusSeq.flatMap { case (_, fileStatuses) =>
fileStatuses.map(sizeInBytesWithDeserFactor(calcDeserFactEnabled, hadoopConf, _))
}
} else {
partitions.map { tablePartition =>
calculateLocationSize(
spark.sessionState,
catalogTable.identifier,
tablePartition.storage.locationUri)
}
}
val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
paths, sessionState.newHadoopConf(), pathFilter, spark, areRootPaths = true)
fileStatusSeq.flatMap(_._2.map(_.getLen)).sum
sumSizeWithMaxDeserializationFactor(sizeWithDeserFactorsForPartitions)
}
}

def sumSizeWithMaxDeserializationFactor(
sizesWithFactors: Seq[SizeInBytesWithDeserFactor]): SizeInBytesWithDeserFactor = {
val definedFactors = sizesWithFactors.filter(_.deserFactor.isDefined).map(_.deserFactor.get)
SizeInBytesWithDeserFactor(
sizesWithFactors.map(_.sizeInBytes).sum,
if (definedFactors.isEmpty) None else Some(definedFactors.max))
}

def sizeInBytesWithDeserFactor(
calcDeserFactEnabled: Boolean,
hadoopConf: Configuration,
fileStatus: FileStatus): SizeInBytesWithDeserFactor = {
assert(fileStatus.isFile)
val factor = if (calcDeserFactEnabled) {
val rawSize = if (fileStatus.getPath.getName.endsWith(".parquet")) {
Some(ParquetUtils.rawSize(hadoopConf, fileStatus.getPath))
} else if (fileStatus.getPath.getName.endsWith(".orc")) {
Some(OrcUtils.rawSize(hadoopConf, fileStatus.getPath))
} else {
partitions.map { p =>
calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
}.sum
None
}

rawSize.map { rawSize =>
// deserialization factor is the quotient of the raw byte size (uncompressed data size)
// with the file size round up to the next integer number
val divAndRemain = rawSize /% BigInt(fileStatus.getLen)
if (divAndRemain._2.signum == 1) divAndRemain._1.toInt + 1 else divAndRemain._1.toInt
}
} else {
None
}

SizeInBytesWithDeserFactor(fileStatus.getLen, factor)
}

def calculateLocationSize(
sessionState: SessionState,
identifier: TableIdentifier,
locationUri: Option[URI]): Long = {
locationUri: Option[URI]): SizeInBytesWithDeserFactor = {
// This method is mainly based on
// org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
// in Hive 0.13 (except that we do not use fs.getContentSummary).
Expand All @@ -90,50 +147,51 @@ object CommandUtils extends Logging {
// Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
// countFileSize to count the table size.
val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
val hadoopConf = sessionState.newHadoopConf()

def getPathSize(fs: FileSystem, path: Path): Long = {
val calcDeserFactEnabled = sessionState.conf.deserFactorStatCalcEnabled
def getSumSizeInBytesWithDeserFactor(fs: FileSystem, path: Path): SizeInBytesWithDeserFactor = {
val fileStatus = fs.getFileStatus(path)
val size = if (fileStatus.isDirectory) {
fs.listStatus(path)
.map { status =>
if (isDataPath(status.getPath, stagingDir)) {
getPathSize(fs, status.getPath)
} else {
0L
}
}.sum
if (fileStatus.isDirectory) {
val fileSizesWithDeserFactor = fs.listStatus(path).map { status =>
if (isDataPath(status.getPath, stagingDir)) {
getSumSizeInBytesWithDeserFactor(fs, status.getPath)
} else {
SizeInBytesWithDeserFactor(0L, None)
}
}
sumSizeWithMaxDeserializationFactor(fileSizesWithDeserFactor)
} else {
fileStatus.getLen
sizeInBytesWithDeserFactor(calcDeserFactEnabled, hadoopConf, fileStatus)
}

size
}

val startTime = System.nanoTime()
logInfo(s"Starting to calculate the total file size under path $locationUri.")
val size = locationUri.map { p =>
val fileSizesWithDeserFactor = locationUri.map { p =>
val path = new Path(p)
try {
val fs = path.getFileSystem(sessionState.newHadoopConf())
getPathSize(fs, path)
val fs = path.getFileSystem(hadoopConf)
getSumSizeInBytesWithDeserFactor(fs, path)
} catch {
case NonFatal(e) =>
logWarning(
s"Failed to get the size of table ${identifier.table} in the " +
s"database ${identifier.database} because of ${e.toString}", e)
0L
SizeInBytesWithDeserFactor(0L, None)
}
}.getOrElse(0L)
}.getOrElse(SizeInBytesWithDeserFactor(0L, None))
val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
logInfo(s"It took $durationInMs ms to calculate the total file size under path $locationUri.")

size
fileSizesWithDeserFactor
}

def compareAndGetNewStats(
oldStats: Option[CatalogStatistics],
newTotalSize: BigInt,
newSizeWithDeserFactor: SizeInBytesWithDeserFactor,
newRowCount: Option[BigInt]): Option[CatalogStatistics] = {
val newTotalSize = newSizeWithDeserFactor.sizeInBytes
val oldTotalSize = oldStats.map(_.sizeInBytes).getOrElse(BigInt(-1))
val oldRowCount = oldStats.flatMap(_.rowCount).getOrElse(BigInt(-1))
var newStats: Option[CatalogStatistics] = None
Expand All @@ -144,15 +202,26 @@ object CommandUtils extends Logging {
// 1. when total size is not changed, we don't need to alter the table;
// 2. when total size is changed, `oldRowCount` becomes invalid.
// This is to make sure that we only record the right statistics.
if (newRowCount.isDefined) {
if (newRowCount.get >= 0 && newRowCount.get != oldRowCount) {
newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = newRowCount))
} else {
Some(CatalogStatistics(sizeInBytes = oldTotalSize, rowCount = newRowCount))
}
newRowCount.foreach { rowCount =>
if (rowCount >= 0 && rowCount != oldRowCount) {
newStats = newStats
.map(_.copy(rowCount = newRowCount))
.orElse(Some(CatalogStatistics(sizeInBytes = oldTotalSize, rowCount = newRowCount)))
}
}
val oldDeserFactor = oldStats.flatMap(_.deserFactor)
val newDeserFactor = newSizeWithDeserFactor.deserFactor.orElse(oldDeserFactor)
if (oldDeserFactor != newDeserFactor || newStats.isDefined) {
newDeserFactor.foreach { _ =>
newStats = newStats
.map(_.copy(deserFactor = newDeserFactor))
.orElse(Some(CatalogStatistics(
sizeInBytes = oldTotalSize,
deserFactor = newDeserFactor,
rowCount = None)))
}
}

newStats
}

Expand Down
Loading