Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Date
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -130,6 +130,7 @@ case class CatalogTable(
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
properties: Map[String, String] = Map.empty,
stats: Option[Statistics] = None,
Copy link
Contributor

@cloud-fan cloud-fan Sep 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should also update toString to include stats.

Copy link
Contributor Author

@wzhfy wzhfy Sep 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan and also simpleString in LogicalRelation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogicalRelation doesn't need to be updated I think.

viewOriginalText: Option[String] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
Expand Down Expand Up @@ -190,6 +191,7 @@ case class CatalogTable(
viewText.map("View: " + _).getOrElse(""),
comment.map("Comment: " + _).getOrElse(""),
if (properties.nonEmpty) s"Properties: $tableProperties" else "",
if (stats.isDefined) s"Statistics: ${stats.get}" else "",
s"$storage")

output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ package org.apache.spark.sql.catalyst.plans.logical
*
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`.
* @param rowCount Estimated number of rows.
* @param isBroadcastable If true, output is small enough to be used in a broadcast join.
*/
case class Statistics(sizeInBytes: BigInt, isBroadcastable: Boolean = false)
case class Statistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
isBroadcastable: Boolean = false) {
override def toString: String = {
val output =
Seq(s"sizeInBytes=$sizeInBytes",
if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
s"isBroadcastable=$isBroadcastable"
)
output.filter(_.nonEmpty).mkString("Statistics(", ", ", ")")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,12 @@ class SQLBuilder private (

object ExtractSQLTable {
def unapply(plan: LogicalPlan): Option[SQLTable] = plan match {
case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
Some(SQLTable(database, table, l.output.map(_.withQualifier(None))))
case l @ LogicalRelation(_, _, Some(catalogTable))
if catalogTable.identifier.database.isDefined =>
Some(SQLTable(
catalogTable.identifier.database.get,
catalogTable.identifier.table,
l.output.map(_.withQualifier(None))))

case relation: CatalogRelation =>
val m = relation.catalogTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.identifier.getText.toLowerCase == "noscan") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except noscan, are there some other options we may support in the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there are. AFAIK we will also support column level statistics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan noscan won't scan files, it only collects statistics like total size. Without noscan, we will collect other stats like row count and column level stats.

AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
} else {
// Always just run the no scan analyze. We should fix this and implement full analyze
// command in the future.
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString, noscan = false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@ import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.datasources.LogicalRelation


/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
*
* Right now, it only supports Hive tables and it only updates the size of a Hive table
* in the Hive metastore.
*/
case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, but looks like AnalyzeTableCommand doesn't handle the possible NoSuchTableException caused by sessionState.catalog.lookupRelation. It should be better to handle it and provide error message.

val sessionState = sparkSession.sessionState
Expand Down Expand Up @@ -71,8 +70,6 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
size
}

val tableParameters = catalogTable.properties
val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
val newTotalSize =
catalogTable.storage.locationUri.map { p =>
val path = new Path(p)
Expand All @@ -88,24 +85,47 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
}
}.getOrElse(0L)

// Update the Hive metastore if the total size of the table is different than the size
// recorded in the Hive metastore.
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
sessionState.catalog.alterTable(
catalogTable.copy(
properties = relation.catalogTable.properties +
(AnalyzeTableCommand.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
}
updateTableStats(catalogTable, newTotalSize)

// data source tables have been converted into LogicalRelations
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)

case otherRelation =>
throw new AnalysisException(s"ANALYZE TABLE is only supported for Hive tables, " +
s"but '${tableIdent.unquotedString}' is a ${otherRelation.nodeName}.")
throw new AnalysisException(s"ANALYZE TABLE is not supported for " +
s"${otherRelation.nodeName}.")
}

def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
var newStats: Option[Statistics] = None
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
newStats = Some(Statistics(sizeInBytes = newTotalSize))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we run ANALYZE TABLE first, and then run ANALYZE TABLE NOSCAN immediately, the row count will be removed. Does it make sense? The row count is still valid for this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan When we have row count info, we can't tell whether the row count is valid or not - we can also run ANALYZE TABLE, and run INSERT INTO, then run ANALYZE TABLE NOSCAN. So removing it can make sure we only have the right answer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense, but we should document this behaviour explicitly.

}
// We only set rowCount when noscan is false, because otherwise:
// 1. when total size is not changed, we don't need to alter the table;
// 2. when total size is changed, `oldRowCount` becomes invalid.
// This is to make sure that we only record the right statistics.
if (!noscan) {
val newRowCount = Dataset.ofRows(sparkSession, relation).count()
if (newRowCount >= 0 && newRowCount != oldRowCount) {
newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
} else {
Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
}
}
}
// Update the metastore if the above statistics of the table are different from those
// recorded in the metastore.
if (newStats.isDefined) {
sessionState.catalog.alterTable(catalogTable.copy(stats = newStats))
// Refresh the cached data source table in the catalog.
sessionState.catalog.refreshTable(tableIdent)
}
}

Seq.empty[Row]
}
}

object AnalyzeTableCommand {
val TOTAL_SIZE_FIELD = "totalSize"
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]

LogicalRelation(
dataSource.resolveRelation(),
metastoreTableIdentifier = Some(table.identifier))
catalogTable = Some(table))
}

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Expand Down Expand Up @@ -366,7 +366,8 @@ object DataSourceStrategy extends Strategy with Logging {
val scan = RowDataSourceScanExec(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, UnknownPartitioning(0), metadata, relation.metastoreTableIdentifier)
relation.relation, UnknownPartitioning(0), metadata,
relation.catalogTable.map(_.identifier))
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
Expand All @@ -376,7 +377,8 @@ object DataSourceStrategy extends Strategy with Logging {
val scan = RowDataSourceScanExec(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, UnknownPartitioning(0), metadata, relation.metastoreTableIdentifier)
relation.relation, UnknownPartitioning(0), metadata,
relation.catalogTable.map(_.identifier))
execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ object FileSourceStrategy extends Strategy with Logging {
outputSchema,
partitionKeyFilters.toSeq,
pushedDownFilters,
table)
table.map(_.identifier))

val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation
Expand All @@ -33,7 +33,7 @@ import org.apache.spark.util.Utils
case class LogicalRelation(
relation: BaseRelation,
expectedOutputAttributes: Option[Seq[Attribute]] = None,
metastoreTableIdentifier: Option[TableIdentifier] = None)
catalogTable: Option[CatalogTable] = None)
extends LeafNode with MultiInstanceRelation {

override val output: Seq[AttributeReference] = {
Expand Down Expand Up @@ -72,9 +72,10 @@ case class LogicalRelation(
// expId can be different but the relation is still the same.
override lazy val cleanArgs: Seq[Any] = Seq(relation)

@transient override lazy val statistics: Statistics = Statistics(
sizeInBytes = BigInt(relation.sizeInBytes)
)
@transient override lazy val statistics: Statistics = {
catalogTable.flatMap(_.stats.map(_.copy(sizeInBytes = relation.sizeInBytes))).getOrElse(
Statistics(sizeInBytes = relation.sizeInBytes))
}

/** Used to lookup original attribute capitalization */
val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o)))
Expand All @@ -89,7 +90,7 @@ case class LogicalRelation(
LogicalRelation(
relation,
expectedOutputAttributes.map(_.map(_.newInstance())),
metastoreTableIdentifier).asInstanceOf[this.type]
catalogTable).asInstanceOf[this.type]
}

override def refresh(): Unit = relation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
case relation: CatalogRelation =>
val metadata = relation.catalogTable
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
case LogicalRelation(h: HadoopFsRelation, _, identifier) =>
val tblName = identifier.map(_.quotedString).getOrElse("unknown")
case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, h.partitionSchema.map(_.name))
case LogicalRelation(_: InsertableRelation, _, identifier) =>
val tblName = identifier.map(_.quotedString).getOrElse("unknown")
case LogicalRelation(_: InsertableRelation, _, catalogTable) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, Nil)
case other => i
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
* Right now, it only supports catalog tables and it only updates the size of a catalog table
* in the external catalog.
*/
def analyze(tableName: String): Unit = {
AnalyzeTableCommand(tableName).run(sparkSession)
def analyze(tableName: String, noscan: Boolean = true): Unit = {
AnalyzeTableCommand(tableName, noscan).run(sparkSession)
}
}
26 changes: 26 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Join, LocalLimit}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -75,4 +76,29 @@ class StatisticsSuite extends QueryTest with SharedSQLContext {
}
}

test("test table-level statistics for data source table created in InMemoryCatalog") {
def checkTableStats(tableName: String, expectedRowCount: Option[BigInt]): Unit = {
val df = sql(s"SELECT * FROM $tableName")
val relations = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
assert(rel.catalogTable.isDefined)
assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount)
rel
}
assert(relations.size === 1)
}

val tableName = "tbl"
withTable(tableName) {
sql(s"CREATE TABLE $tableName(i INT, j STRING) USING parquet")
Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto("tbl")

// noscan won't count the number of rows
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
checkTableStats(tableName, expectedRowCount = None)

// without noscan, we count the number of rows
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
checkTableStats(tableName, expectedRowCount = Some(2))
}
}
}
Loading