Skip to content

Commit d07fc30

Browse files
wangyummaropu
authored andcommitted
[SPARK-33687][SQL] Support analyze all tables in a specific database
### What changes were proposed in this pull request? This pr add support analyze all tables in a specific database: ```g4 ANALYZE TABLES ((FROM | IN) multipartIdentifier)? COMPUTE STATISTICS (identifier)? ``` ### Why are the changes needed? 1. Make it easy to analyze all tables in a specific database. 2. PostgreSQL has a similar implementation: https://www.postgresql.org/docs/12/sql-analyze.html. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The feature tested by unit test. The documentation tested by regenerating the documentation: menu-sql.yaml |  sql-ref-syntax-aux-analyze-tables.md -- | -- ![image](https://user-images.githubusercontent.com/5399861/109098769-dc33a200-775c-11eb-86b1-55531e5425e0.png) | ![image](https://user-images.githubusercontent.com/5399861/109098841-02594200-775d-11eb-8588-de8da97ec94a.png) Closes #30648 from wangyum/SPARK-33687. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
1 parent 5a48eb8 commit d07fc30

File tree

15 files changed

+285
-35
lines changed

15 files changed

+285
-35
lines changed

docs/_data/menu-sql.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@
198198
subitems:
199199
- text: ANALYZE TABLE
200200
url: sql-ref-syntax-aux-analyze-table.html
201+
- text: ANALYZE TABLES
202+
url: sql-ref-syntax-aux-analyze-tables.html
201203
- text: CACHE
202204
url: sql-ref-syntax-aux-cache.html
203205
subitems:

docs/sql-ref-syntax-aux-analyze-table.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ ANALYZE TABLE table_identifier [ partition_spec ]
5050
* If no analyze option is specified, `ANALYZE TABLE` collects the table's number of rows and size in bytes.
5151
* **NOSCAN**
5252

53-
Collects only the table's size in bytes ( which does not require scanning the entire table ).
53+
Collects only the table's size in bytes (which does not require scanning the entire table).
5454
* **FOR COLUMNS col [ , ... ] `|` FOR ALL COLUMNS**
5555

5656
Collects column statistics for each column specified, or alternatively for every column, as well as table statistics.
@@ -122,3 +122,7 @@ DESC EXTENDED students name;
122122
| histogram| NULL|
123123
+--------------+----------+
124124
```
125+
126+
### Related Statements
127+
128+
* [ANALYZE TABLES](sql-ref-syntax-aux-analyze-tables.html)
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
---
2+
layout: global
3+
title: ANALYZE TABLES
4+
displayTitle: ANALYZE TABLES
5+
license: |
6+
Licensed to the Apache Software Foundation (ASF) under one or more
7+
contributor license agreements. See the NOTICE file distributed with
8+
this work for additional information regarding copyright ownership.
9+
The ASF licenses this file to You under the Apache License, Version 2.0
10+
(the "License"); you may not use this file except in compliance with
11+
the License. You may obtain a copy of the License at
12+
13+
http://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
---
21+
22+
### Description
23+
24+
The `ANALYZE TABLES` statement collects statistics about all the tables in a specified database to be used by the query optimizer to find a better query execution plan.
25+
26+
### Syntax
27+
28+
```sql
29+
ANALYZE TABLES [ { FROM | IN } database_name ] COMPUTE STATISTICS [ NOSCAN ]
30+
```
31+
32+
### Parameters
33+
34+
* **{ FROM `|` IN } database_name**
35+
36+
Specifies the name of the database to be analyzed. Without a database name, `ANALYZE` collects all tables in the current database that the current user has permission to analyze.
37+
38+
* **[ NOSCAN ]**
39+
40+
Collects only the table's size in bytes (which does not require scanning the entire table).
41+
42+
### Examples
43+
44+
```sql
45+
CREATE DATABASE school_db;
46+
USE school_db;
47+
48+
CREATE TABLE teachers (name STRING, teacher_id INT);
49+
INSERT INTO teachers VALUES ('Tom', 1), ('Jerry', 2);
50+
51+
CREATE TABLE students (name STRING, student_id INT, age SHORT);
52+
INSERT INTO students VALUES ('Mark', 111111, 10), ('John', 222222, 11);
53+
54+
ANALYZE TABLES IN school_db COMPUTE STATISTICS NOSCAN;
55+
56+
DESC EXTENDED teachers;
57+
+--------------------+--------------------+-------+
58+
| col_name| data_type|comment|
59+
+--------------------+--------------------+-------+
60+
| name| string| null|
61+
| teacher_id| int| null|
62+
| ...| ...| ...|
63+
| Provider| parquet| |
64+
| Statistics| 1382 bytes| |
65+
| ...| ...| ...|
66+
+--------------------+--------------------+-------+
67+
68+
DESC EXTENDED students;
69+
+--------------------+--------------------+-------+
70+
| col_name| data_type|comment|
71+
+--------------------+--------------------+-------+
72+
| name| string| null|
73+
| student_id| int| null|
74+
| age| smallint| null|
75+
| ...| ...| ...|
76+
| Statistics| 1828 bytes| |
77+
| ...| ...| ...|
78+
+--------------------+--------------------+-------+
79+
80+
ANALYZE TABLES COMPUTE STATISTICS;
81+
82+
DESC EXTENDED teachers;
83+
+--------------------+--------------------+-------+
84+
| col_name| data_type|comment|
85+
+--------------------+--------------------+-------+
86+
| name| string| null|
87+
| teacher_id| int| null|
88+
| ...| ...| ...|
89+
| Provider| parquet| |
90+
| Statistics| 1382 bytes, 2 rows| |
91+
| ...| ...| ...|
92+
+--------------------+--------------------+-------+
93+
94+
DESC EXTENDED students;
95+
+--------------------+--------------------+-------+
96+
| col_name| data_type|comment|
97+
+--------------------+--------------------+-------+
98+
| name| string| null|
99+
| student_id| int| null|
100+
| age| smallint| null|
101+
| ...| ...| ...|
102+
| Provider| parquet| |
103+
| Statistics| 1828 bytes, 2 rows| |
104+
| ...| ...| ...|
105+
+--------------------+--------------------+-------+
106+
```
107+
108+
### Related Statements
109+
110+
* [ANALYZE TABLE](sql-ref-syntax-aux-analyze-table.html)

docs/sql-ref-syntax-aux-analyze.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ license: |
2020
---
2121

2222
* [ANALYZE TABLE statement](sql-ref-syntax-aux-analyze-table.html)
23+
* [ANALYZE TABLES statement](sql-ref-syntax-aux-analyze-tables.html)

docs/sql-ref-syntax.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ Spark SQL is Apache Spark's module for working with structured data. The SQL Syn
7777
* [ADD FILE](sql-ref-syntax-aux-resource-mgmt-add-file.html)
7878
* [ADD JAR](sql-ref-syntax-aux-resource-mgmt-add-jar.html)
7979
* [ANALYZE TABLE](sql-ref-syntax-aux-analyze-table.html)
80+
* [ANALYZE TABLES](sql-ref-syntax-aux-analyze-tables.html)
8081
* [CACHE TABLE](sql-ref-syntax-aux-cache-cache-table.html)
8182
* [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html)
8283
* [DESCRIBE DATABASE](sql-ref-syntax-aux-describe-database.html)

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ statement
134134
(AS? query)? #replaceTable
135135
| ANALYZE TABLE multipartIdentifier partitionSpec? COMPUTE STATISTICS
136136
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
137+
| ANALYZE TABLES ((FROM | IN) multipartIdentifier)? COMPUTE STATISTICS
138+
(identifier)? #analyzeTables
137139
| ALTER TABLE multipartIdentifier
138140
ADD (COLUMN | COLUMNS)
139141
columns=qualifiedColTypeWithPositionList #addTableColumns

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,8 @@ class Analyzer(override val catalogManager: CatalogManager)
859859
s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
860860
case s @ ShowViews(UnresolvedNamespace(Seq()), _, _) =>
861861
s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
862+
case a @ AnalyzeTables(UnresolvedNamespace(Seq()), _) =>
863+
a.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
862864
case UnresolvedNamespace(Seq()) =>
863865
ResolvedNamespace(currentCatalog, Seq.empty[String])
864866
case UnresolvedNamespace(CatalogAndNamespace(catalog, ns)) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3654,6 +3654,25 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
36543654
}
36553655
}
36563656

3657+
/**
3658+
* Create an [[AnalyzeTables]].
3659+
* Example SQL for analyzing all tables in default database:
3660+
* {{{
3661+
* ANALYZE TABLES IN default COMPUTE STATISTICS;
3662+
* }}}
3663+
*/
3664+
override def visitAnalyzeTables(ctx: AnalyzeTablesContext): LogicalPlan = withOrigin(ctx) {
3665+
if (ctx.identifier != null &&
3666+
ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
3667+
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`",
3668+
ctx.identifier())
3669+
}
3670+
val multiPart = Option(ctx.multipartIdentifier).map(visitMultipartIdentifier)
3671+
AnalyzeTables(
3672+
UnresolvedNamespace(multiPart.getOrElse(Seq.empty[String])),
3673+
noScan = ctx.identifier != null)
3674+
}
3675+
36573676
/**
36583677
* Create a [[RepairTable]].
36593678
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,15 @@ case class AnalyzeTable(
660660
override def children: Seq[LogicalPlan] = child :: Nil
661661
}
662662

663+
/**
664+
* The logical plan of the ANALYZE TABLES command.
665+
*/
666+
case class AnalyzeTables(
667+
namespace: LogicalPlan,
668+
noScan: Boolean) extends Command {
669+
override def children: Seq[LogicalPlan] = Seq(namespace)
670+
}
671+
663672
/**
664673
* The logical plan of the ANALYZE TABLE FOR COLUMNS command.
665674
*/

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1873,6 +1873,15 @@ class DDLParserSuite extends AnalysisTest {
18731873
"Expected `NOSCAN` instead of `xxxx`")
18741874
}
18751875

1876+
test("SPARK-33687: analyze tables statistics") {
1877+
comparePlans(parsePlan("ANALYZE TABLES IN a.b.c COMPUTE STATISTICS"),
1878+
AnalyzeTables(UnresolvedNamespace(Seq("a", "b", "c")), noScan = false))
1879+
comparePlans(parsePlan("ANALYZE TABLES FROM a COMPUTE STATISTICS NOSCAN"),
1880+
AnalyzeTables(UnresolvedNamespace(Seq("a")), noScan = true))
1881+
intercept("ANALYZE TABLES IN a.b.c COMPUTE STATISTICS xxxx",
1882+
"Expected `NOSCAN` instead of `xxxx`")
1883+
}
1884+
18761885
test("analyze table column statistics") {
18771886
intercept("ANALYZE TABLE a.b.c COMPUTE STATISTICS FOR COLUMNS", "")
18781887

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
373373
AnalyzePartitionCommand(ident.asTableIdentifier, partitionSpec, noScan)
374374
}
375375

376+
case AnalyzeTables(DatabaseInSessionCatalog(db), noScan) =>
377+
AnalyzeTablesCommand(Some(db), noScan)
378+
376379
case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
377380
AnalyzeColumnCommand(ident.asTableIdentifier, columnNames, allColumns)
378381

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,49 +17,19 @@
1717

1818
package org.apache.spark.sql.execution.command
1919

20-
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
20+
import org.apache.spark.sql.{Row, SparkSession}
2121
import org.apache.spark.sql.catalyst.TableIdentifier
22-
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
2322

2423

2524
/**
2625
* Analyzes the given table to generate statistics, which will be used in query optimizations.
2726
*/
2827
case class AnalyzeTableCommand(
2928
tableIdent: TableIdentifier,
30-
noscan: Boolean = true) extends RunnableCommand {
29+
noScan: Boolean = true) extends RunnableCommand {
3130

3231
override def run(sparkSession: SparkSession): Seq[Row] = {
33-
val sessionState = sparkSession.sessionState
34-
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
35-
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
36-
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
37-
if (tableMeta.tableType == CatalogTableType.VIEW) {
38-
// Analyzes a catalog view if the view is cached
39-
val table = sparkSession.table(tableIdent.quotedString)
40-
val cacheManager = sparkSession.sharedState.cacheManager
41-
if (cacheManager.lookupCachedData(table.logicalPlan).isDefined) {
42-
if (!noscan) {
43-
// To collect table stats, materializes an underlying columnar RDD
44-
table.count()
45-
}
46-
} else {
47-
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
48-
}
49-
} else {
50-
// Compute stats for the whole table
51-
val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
52-
val newRowCount =
53-
if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
54-
55-
// Update the metastore if the above statistics of the table are different from those
56-
// recorded in the metastore.
57-
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
58-
if (newStats.isDefined) {
59-
sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
60-
}
61-
}
62-
32+
CommandUtils.analyzeTable(sparkSession, tableIdent, noScan)
6333
Seq.empty[Row]
6434
}
6535
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.command
19+
20+
import scala.util.control.NonFatal
21+
22+
import org.apache.spark.sql.{Row, SparkSession}
23+
24+
25+
/**
26+
* Analyzes all tables in the given database to generate statistics.
27+
*/
28+
case class AnalyzeTablesCommand(
29+
databaseName: Option[String],
30+
noScan: Boolean) extends RunnableCommand {
31+
32+
override def run(sparkSession: SparkSession): Seq[Row] = {
33+
val catalog = sparkSession.sessionState.catalog
34+
val db = databaseName.getOrElse(catalog.getCurrentDatabase)
35+
catalog.listTables(db).foreach { tbl =>
36+
try {
37+
CommandUtils.analyzeTable(sparkSession, tbl, noScan)
38+
} catch {
39+
case NonFatal(e) =>
40+
logWarning(s"Failed to analyze table ${tbl.table} in the " +
41+
s"database $db because of ${e.toString}", e)
42+
}
43+
}
44+
Seq.empty[Row]
45+
}
46+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.sql.{AnalysisException, SparkSession}
2929
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
30-
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable}
30+
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
3131
import org.apache.spark.sql.catalyst.expressions._
3232
import org.apache.spark.sql.catalyst.expressions.aggregate._
3333
import org.apache.spark.sql.catalyst.plans.logical._
@@ -199,6 +199,41 @@ object CommandUtils extends Logging {
199199
newStats
200200
}
201201

202+
def analyzeTable(
203+
sparkSession: SparkSession,
204+
tableIdent: TableIdentifier,
205+
noScan: Boolean): Unit = {
206+
val sessionState = sparkSession.sessionState
207+
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
208+
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
209+
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
210+
if (tableMeta.tableType == CatalogTableType.VIEW) {
211+
// Analyzes a catalog view if the view is cached
212+
val table = sparkSession.table(tableIdent.quotedString)
213+
val cacheManager = sparkSession.sharedState.cacheManager
214+
if (cacheManager.lookupCachedData(table.logicalPlan).isDefined) {
215+
if (!noScan) {
216+
// To collect table stats, materializes an underlying columnar RDD
217+
table.count()
218+
}
219+
} else {
220+
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
221+
}
222+
} else {
223+
// Compute stats for the whole table
224+
val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
225+
val newRowCount =
226+
if (noScan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
227+
228+
// Update the metastore if the above statistics of the table are different from those
229+
// recorded in the metastore.
230+
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
231+
if (newStats.isDefined) {
232+
sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
233+
}
234+
}
235+
}
236+
202237
/**
203238
* Compute stats for the given columns.
204239
* @return (row count, map from column name to CatalogColumnStats)

0 commit comments

Comments
 (0)