Skip to content

[SPARK-33687][SQL] Support analyze all tables in a specific database #30648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/_data/menu-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@
subitems:
- text: ANALYZE TABLE
url: sql-ref-syntax-aux-analyze-table.html
- text: ANALYZE TABLES
url: sql-ref-syntax-aux-analyze-tables.html
- text: CACHE
url: sql-ref-syntax-aux-cache.html
subitems:
Expand Down
6 changes: 5 additions & 1 deletion docs/sql-ref-syntax-aux-analyze-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ ANALYZE TABLE table_identifier [ partition_spec ]
* If no analyze option is specified, `ANALYZE TABLE` collects the table's number of rows and size in bytes.
* **NOSCAN**

Collects only the table's size in bytes ( which does not require scanning the entire table ).
Collects only the table's size in bytes (which does not require scanning the entire table).
* **FOR COLUMNS col [ , ... ] `|` FOR ALL COLUMNS**

Collects column statistics for each column specified, or alternatively for every column, as well as table statistics.
Expand Down Expand Up @@ -122,3 +122,7 @@ DESC EXTENDED students name;
| histogram| NULL|
+--------------+----------+
```

### Related Statements

* [ANALYZE TABLES](sql-ref-syntax-aux-analyze-tables.html)
110 changes: 110 additions & 0 deletions docs/sql-ref-syntax-aux-analyze-tables.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
---
layout: global
title: ANALYZE TABLES
displayTitle: ANALYZE TABLES
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

### Description

The `ANALYZE TABLES` statement collects statistics about all the tables in a specified database to be used by the query optimizer to find a better query execution plan.

### Syntax

```sql
ANALYZE TABLES [ { FROM | IN } database_name ] COMPUTE STATISTICS [ NOSCAN ]
```

### Parameters

* **{ FROM `|` IN } database_name**

Specifies the name of the database to be analyzed. Without a database name, `ANALYZE` collects all tables in the current database that the current user has permission to analyze.

* **[ NOSCAN ]**

Collects only the table's size in bytes (which does not require scanning the entire table).

### Examples

```sql
CREATE DATABASE school_db;
USE school_db;

CREATE TABLE teachers (name STRING, teacher_id INT);
INSERT INTO teachers VALUES ('Tom', 1), ('Jerry', 2);

CREATE TABLE students (name STRING, student_id INT, age SHORT);
INSERT INTO students VALUES ('Mark', 111111, 10), ('John', 222222, 11);

ANALYZE TABLES IN school_db COMPUTE STATISTICS NOSCAN;

DESC EXTENDED teachers;
+--------------------+--------------------+-------+
| col_name| data_type|comment|
+--------------------+--------------------+-------+
| name| string| null|
| teacher_id| int| null|
| ...| ...| ...|
| Provider| parquet| |
| Statistics| 1382 bytes| |
| ...| ...| ...|
+--------------------+--------------------+-------+

DESC EXTENDED students;
+--------------------+--------------------+-------+
| col_name| data_type|comment|
+--------------------+--------------------+-------+
| name| string| null|
| student_id| int| null|
| age| smallint| null|
| ...| ...| ...|
| Statistics| 1828 bytes| |
| ...| ...| ...|
+--------------------+--------------------+-------+

ANALYZE TABLES COMPUTE STATISTICS;

DESC EXTENDED teachers;
+--------------------+--------------------+-------+
| col_name| data_type|comment|
+--------------------+--------------------+-------+
| name| string| null|
| teacher_id| int| null|
| ...| ...| ...|
| Provider| parquet| |
| Statistics| 1382 bytes, 2 rows| |
| ...| ...| ...|
+--------------------+--------------------+-------+

DESC EXTENDED students;
+--------------------+--------------------+-------+
| col_name| data_type|comment|
+--------------------+--------------------+-------+
| name| string| null|
| student_id| int| null|
| age| smallint| null|
| ...| ...| ...|
| Provider| parquet| |
| Statistics| 1828 bytes, 2 rows| |
| ...| ...| ...|
+--------------------+--------------------+-------+
```

### Related Statements

* [ANALYZE TABLE](sql-ref-syntax-aux-analyze-table.html)
1 change: 1 addition & 0 deletions docs/sql-ref-syntax-aux-analyze.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ license: |
---

* [ANALYZE TABLE statement](sql-ref-syntax-aux-analyze-table.html)
* [ANALYZE TABLES statement](sql-ref-syntax-aux-analyze-tables.html)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz update the index page, too.

1 change: 1 addition & 0 deletions docs/sql-ref-syntax.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Spark SQL is Apache Spark's module for working with structured data. The SQL Syn
* [ADD FILE](sql-ref-syntax-aux-resource-mgmt-add-file.html)
* [ADD JAR](sql-ref-syntax-aux-resource-mgmt-add-jar.html)
* [ANALYZE TABLE](sql-ref-syntax-aux-analyze-table.html)
* [ANALYZE TABLES](sql-ref-syntax-aux-analyze-tables.html)
* [CACHE TABLE](sql-ref-syntax-aux-cache-cache-table.html)
* [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html)
* [DESCRIBE DATABASE](sql-ref-syntax-aux-describe-database.html)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ statement
(AS? query)? #replaceTable
| ANALYZE TABLE multipartIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
| ANALYZE TABLES ((FROM | IN) multipartIdentifier)? COMPUTE STATISTICS
(identifier)? #analyzeTables
Copy link
Member

@maropu maropu Dec 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If identifier is only for NOSCAN, how about defining a new ANTLR token for that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

analyze also uses this identifier , how about do it in a separate pr?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

| ALTER TABLE multipartIdentifier
ADD (COLUMN | COLUMNS)
columns=qualifiedColTypeWithPositionList #addTableColumns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,8 @@ class Analyzer(override val catalogManager: CatalogManager)
s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
case s @ ShowViews(UnresolvedNamespace(Seq()), _, _) =>
s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
case a @ AnalyzeTables(UnresolvedNamespace(Seq()), _) =>
a.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
case UnresolvedNamespace(Seq()) =>
ResolvedNamespace(currentCatalog, Seq.empty[String])
case UnresolvedNamespace(CatalogAndNamespace(catalog, ns)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3654,6 +3654,25 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
}

/**
* Create an [[AnalyzeTables]].
* Example SQL for analyzing all tables in default database:
* {{{
* ANALYZE TABLES IN default COMPUTE STATISTICS;
* }}}
*/
override def visitAnalyzeTables(ctx: AnalyzeTablesContext): LogicalPlan = withOrigin(ctx) {
if (ctx.identifier != null &&
ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`",
ctx.identifier())
}
val multiPart = Option(ctx.multipartIdentifier).map(visitMultipartIdentifier)
AnalyzeTables(
UnresolvedNamespace(multiPart.getOrElse(Seq.empty[String])),
noScan = ctx.identifier != null)
}

/**
* Create a [[RepairTable]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,15 @@ case class AnalyzeTable(
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the ANALYZE TABLES command.
*/
case class AnalyzeTables(
namespace: LogicalPlan,
noScan: Boolean) extends Command {
override def children: Seq[LogicalPlan] = Seq(namespace)
}

/**
* The logical plan of the ANALYZE TABLE FOR COLUMNS command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,15 @@ class DDLParserSuite extends AnalysisTest {
"Expected `NOSCAN` instead of `xxxx`")
}

test("SPARK-33687: analyze tables statistics") {
comparePlans(parsePlan("ANALYZE TABLES IN a.b.c COMPUTE STATISTICS"),
AnalyzeTables(UnresolvedNamespace(Seq("a", "b", "c")), noScan = false))
comparePlans(parsePlan("ANALYZE TABLES FROM a COMPUTE STATISTICS NOSCAN"),
AnalyzeTables(UnresolvedNamespace(Seq("a")), noScan = true))
intercept("ANALYZE TABLES IN a.b.c COMPUTE STATISTICS xxxx",
"Expected `NOSCAN` instead of `xxxx`")
}

test("analyze table column statistics") {
intercept("ANALYZE TABLE a.b.c COMPUTE STATISTICS FOR COLUMNS", "")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
AnalyzePartitionCommand(ident.asTableIdentifier, partitionSpec, noScan)
}

case AnalyzeTables(DatabaseInSessionCatalog(db), noScan) =>
AnalyzeTablesCommand(Some(db), noScan)

case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
AnalyzeColumnCommand(ident.asTableIdentifier, columnNames, allColumns)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,49 +17,19 @@

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType


/**
* Analyzes the given table to generate statistics, which will be used in query optimizations.
*/
case class AnalyzeTableCommand(
tableIdent: TableIdentifier,
noscan: Boolean = true) extends RunnableCommand {
noScan: Boolean = true) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
if (tableMeta.tableType == CatalogTableType.VIEW) {
// Analyzes a catalog view if the view is cached
val table = sparkSession.table(tableIdent.quotedString)
val cacheManager = sparkSession.sharedState.cacheManager
if (cacheManager.lookupCachedData(table.logicalPlan).isDefined) {
if (!noscan) {
// To collect table stats, materializes an underlying columnar RDD
table.count()
}
} else {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}
} else {
// Compute stats for the whole table
val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
val newRowCount =
if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))

// Update the metastore if the above statistics of the table are different from those
// recorded in the metastore.
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
if (newStats.isDefined) {
sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
}
}

CommandUtils.analyzeTable(sparkSession, tableIdent, noScan)
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import scala.util.control.NonFatal

import org.apache.spark.sql.{Row, SparkSession}


/**
* Analyzes all tables in the given database to generate statistics.
*/
case class AnalyzeTablesCommand(
databaseName: Option[String],
noScan: Boolean) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val db = databaseName.getOrElse(catalog.getCurrentDatabase)
catalog.listTables(db).foreach { tbl =>
try {
CommandUtils.analyzeTable(sparkSession, tbl, noScan)
} catch {
case NonFatal(e) =>
logWarning(s"Failed to analyze table ${tbl.table} in the " +
s"database $db because of ${e.toString}", e)
}
}
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -199,6 +199,41 @@ object CommandUtils extends Logging {
newStats
}

def analyzeTable(
sparkSession: SparkSession,
tableIdent: TableIdentifier,
noScan: Boolean): Unit = {
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
if (tableMeta.tableType == CatalogTableType.VIEW) {
// Analyzes a catalog view if the view is cached
val table = sparkSession.table(tableIdent.quotedString)
val cacheManager = sparkSession.sharedState.cacheManager
if (cacheManager.lookupCachedData(table.logicalPlan).isDefined) {
if (!noScan) {
// To collect table stats, materializes an underlying columnar RDD
table.count()
}
} else {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}
} else {
// Compute stats for the whole table
val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
val newRowCount =
if (noScan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))

// Update the metastore if the above statistics of the table are different from those
// recorded in the metastore.
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
if (newStats.isDefined) {
sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
}
}
}

/**
* Compute stats for the given columns.
* @return (row count, map from column name to CatalogColumnStats)
Expand Down
Loading