-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-17072] [SQL] support table-level statistics generation and storing into/loading from metastore #14712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ac88f25
a3af25a
001ef88
a2387ee
dad43f7
4b4358f
cb6ab95
10f6e07
eb8b9c0
ac332ec
0fb2149
bb0861e
61f6c27
6d6e482
2db1fbf
9c27071
7e39a86
aef78d4
c7cc55f
56ec68e
9715770
aa438c4
b6c655a
b946df0
5d6e559
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -99,9 +99,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { | |
| ctx.identifier.getText.toLowerCase == "noscan") { | ||
|
||
| AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString) | ||
| } else { | ||
| // Always just run the no scan analyze. We should fix this and implement full analyze | ||
| // command in the future. | ||
| AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString) | ||
| AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString, noscan = false) | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,19 +21,18 @@ import scala.util.control.NonFatal | |
|
|
||
| import org.apache.hadoop.fs.{FileSystem, Path} | ||
|
|
||
| import org.apache.spark.sql.{AnalysisException, Row, SparkSession} | ||
| import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} | ||
| import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases | ||
| import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} | ||
| import org.apache.spark.sql.catalyst.plans.logical.Statistics | ||
| import org.apache.spark.sql.execution.datasources.LogicalRelation | ||
|
|
||
|
|
||
| /** | ||
| * Analyzes the given table in the current database to generate statistics, which will be | ||
| * used in query optimizations. | ||
| * | ||
| * Right now, it only supports Hive tables and it only updates the size of a Hive table | ||
| * in the Hive metastore. | ||
| */ | ||
| case class AnalyzeTableCommand(tableName: String) extends RunnableCommand { | ||
| case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extends RunnableCommand { | ||
|
|
||
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
|
||
| val sessionState = sparkSession.sessionState | ||
|
|
@@ -71,8 +70,6 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand { | |
| size | ||
| } | ||
|
|
||
| val tableParameters = catalogTable.properties | ||
| val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L) | ||
| val newTotalSize = | ||
| catalogTable.storage.locationUri.map { p => | ||
| val path = new Path(p) | ||
|
|
@@ -88,24 +85,47 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand { | |
| } | ||
| }.getOrElse(0L) | ||
|
|
||
| // Update the Hive metastore if the total size of the table is different than the size | ||
| // recorded in the Hive metastore. | ||
| // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). | ||
| if (newTotalSize > 0 && newTotalSize != oldTotalSize) { | ||
| sessionState.catalog.alterTable( | ||
| catalogTable.copy( | ||
| properties = relation.catalogTable.properties + | ||
| (AnalyzeTableCommand.TOTAL_SIZE_FIELD -> newTotalSize.toString))) | ||
| } | ||
| updateTableStats(catalogTable, newTotalSize) | ||
|
|
||
| // data source tables have been converted into LogicalRelations | ||
| case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => | ||
| updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) | ||
|
|
||
| case otherRelation => | ||
| throw new AnalysisException(s"ANALYZE TABLE is only supported for Hive tables, " + | ||
| s"but '${tableIdent.unquotedString}' is a ${otherRelation.nodeName}.") | ||
| throw new AnalysisException(s"ANALYZE TABLE is not supported for " + | ||
| s"${otherRelation.nodeName}.") | ||
| } | ||
|
|
||
| def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { | ||
| val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L) | ||
| val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) | ||
| var newStats: Option[Statistics] = None | ||
| if (newTotalSize > 0 && newTotalSize != oldTotalSize) { | ||
| newStats = Some(Statistics(sizeInBytes = newTotalSize)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we run
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan When we have row count info, we can't tell whether the row count is valid or not - we can also run ANALYZE TABLE, and run
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok makes sense, but we should document this behaviour explicitly. |
||
| } | ||
| // We only set rowCount when noscan is false, because otherwise: | ||
| // 1. when total size is not changed, we don't need to alter the table; | ||
| // 2. when total size is changed, `oldRowCount` becomes invalid. | ||
| // This is to make sure that we only record the right statistics. | ||
| if (!noscan) { | ||
| val newRowCount = Dataset.ofRows(sparkSession, relation).count() | ||
| if (newRowCount >= 0 && newRowCount != oldRowCount) { | ||
| newStats = if (newStats.isDefined) { | ||
| newStats.map(_.copy(rowCount = Some(BigInt(newRowCount)))) | ||
| } else { | ||
| Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount)))) | ||
| } | ||
| } | ||
| } | ||
| // Update the metastore if the above statistics of the table are different from those | ||
| // recorded in the metastore. | ||
| if (newStats.isDefined) { | ||
| sessionState.catalog.alterTable(catalogTable.copy(stats = newStats)) | ||
| // Refresh the cached data source table in the catalog. | ||
| sessionState.catalog.refreshTable(tableIdent) | ||
| } | ||
| } | ||
|
|
||
| Seq.empty[Row] | ||
| } | ||
| } | ||
|
|
||
| object AnalyzeTableCommand { | ||
| val TOTAL_SIZE_FIELD = "totalSize" | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should also update
toStringto includestats.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan and also
simpleStringinLogicalRelation?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LogicalRelationdoesn't need to be updated I think.