Skip to content

Commit

Permalink
introduce a config
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Nov 8, 2023
1 parent b16f725 commit 9768527
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2671,6 +2671,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val ANALYZE_PARTITION_STATS_ENABLED =
buildConf("spark.sql.statistics.update.partitionStats.enabled")
.doc("When this config is enabled, Spark will also update partition statistics in analyze " +
"table command (i.e., ANALYZE TABLE .. COMPUTE STATISTICS [NOSCAN]). Note the command " +
"will also become more expensive. When this config is disabled, Spark will only " +
"update table level statistics.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

val CBO_ENABLED =
buildConf("spark.sql.cbo.enabled")
.doc("Enables CBO for estimation of plan statistics when set true.")
Expand Down Expand Up @@ -5099,6 +5109,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def autoSizeUpdateEnabled: Boolean = getConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED)

def analyzePartitionStatsEnabled: Boolean = getConf(SQLConf.ANALYZE_PARTITION_STATS_ENABLED)

def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)

def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ object CommandUtils extends Logging {
tableIdent: TableIdentifier,
noScan: Boolean): Unit = {
val sessionState = sparkSession.sessionState
val partitionStatsEnabled = sessionState.conf.analyzePartitionStatsEnabled
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
Expand All @@ -249,7 +250,7 @@ object CommandUtils extends Logging {
} else {
// Compute stats for the whole table
val rowCounts: Map[TablePartitionSpec, BigInt] =
if (noScan) {
if (noScan || !partitionStatsEnabled) {
Map.empty
} else {
calculateRowCountsPerPartition(sparkSession, tableMeta, None)
Expand All @@ -266,8 +267,8 @@ object CommandUtils extends Logging {
if (newStats.isDefined) {
sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
}
// Also update partition stats
if (newPartitions.nonEmpty) {
// Also update partition stats when the config is enabled
if (newPartitions.nonEmpty && partitionStatsEnabled) {
sessionState.catalog.alterPartitions(tableIdentWithDB, newPartitions)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,54 +372,70 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
partition.stats
}

withTable(tableName) {
withTempPath { path =>
// Create a table with 3 partitions all located under a single top-level directory 'path'
sql(
s"""
|CREATE TABLE $tableName (key STRING, value STRING)
|USING hive
|PARTITIONED BY (ds STRING)
|LOCATION '${path.toURI}'
""".stripMargin)
Seq(true, false).foreach { partitionStatsEnabled =>
withSQLConf(SQLConf.ANALYZE_PARTITION_STATS_ENABLED.key -> partitionStatsEnabled.toString) {
withTable(tableName) {
withTempPath { path =>
// Create a table with 3 partitions all located under a directory 'path'
sql(
s"""
|CREATE TABLE $tableName (key STRING, value STRING)
|USING hive
|PARTITIONED BY (ds STRING)
|LOCATION '${path.toURI}'
""".stripMargin)

val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")

partitionDates.foreach { ds =>
sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
sql("SELECT * FROM src").write.mode(SaveMode.Overwrite)
.format("parquet").save(s"$path/ds=$ds")
}
partitionDates.foreach { ds =>
sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
sql("SELECT * from src").write.mode(SaveMode.Overwrite)
.format("parquet").save(s"$path/ds=$ds")
}

assert(getCatalogTable(tableName).stats.isEmpty)
partitionDates.foreach { ds =>
assert(queryStats(ds).isEmpty)
}
assert(getCatalogTable(tableName).stats.isEmpty)
partitionDates.foreach { ds =>
assert(queryStats(ds).isEmpty)
}

sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN")
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN")

// Table and partition stats should also have been updated
assert(getTableStats(tableName).sizeInBytes == 3 * 4411)
assert(getTableStats(tableName).rowCount.isEmpty)
partitionDates.foreach { ds =>
val partStats = queryStats(ds)
assert(partStats.nonEmpty)
assert(partStats.get.sizeInBytes == 4411)
assert(partStats.get.rowCount.isEmpty)
}
val expectedRowCount = 25

sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
// Table size should also have been updated
assert(getTableStats(tableName).sizeInBytes > 0)
// Row count should NOT be updated with the `NOSCAN` option
assert(getTableStats(tableName).rowCount.isEmpty)

assert(getTableStats(tableName).sizeInBytes == 3 * 4411)
// Table row count should be updated
assert(getTableStats(tableName).rowCount.get == 75)
partitionDates.foreach { ds =>
val partStats = queryStats(ds)
if (partitionStatsEnabled) {
assert(partStats.nonEmpty)
assert(partStats.get.sizeInBytes > 0)
assert(partStats.get.rowCount.isEmpty)
} else {
assert(partStats.isEmpty)
}
}

partitionDates.foreach { ds =>
val partStats = queryStats(ds)
assert(partStats.nonEmpty)
// The scan option should update partition row count
assert(partStats.get.sizeInBytes == 4411)
assert(partStats.get.rowCount.get == 25)
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")

assert(getTableStats(tableName).sizeInBytes > 0)
// Table row count should be updated
assert(getTableStats(tableName).rowCount.get == 3 * expectedRowCount)

partitionDates.foreach { ds =>
val partStats = queryStats(ds)
if (partitionStatsEnabled) {
assert(partStats.nonEmpty)
// The scan option should update partition row count
assert(partStats.get.sizeInBytes > 0)
assert(partStats.get.rowCount.get == expectedRowCount)
} else {
assert(partStats.isEmpty)
}
}
}
}
}
}
Expand Down

0 comments on commit 9768527

Please sign in to comment.