Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Nov 1, 2023
1 parent 8c11804 commit 7ae76c1
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ object CommandUtils extends Logging {
}
} else {
// Compute stats for the whole table
val (newTotalSize, _) = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
val (newTotalSize, newPartitions) = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
val newRowCount =
if (noScan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))

Expand All @@ -241,6 +241,10 @@ object CommandUtils extends Logging {
if (newStats.isDefined) {
sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
}
// Also update partition stats
if (newPartitions.nonEmpty) {
sessionState.catalog.alterPartitions(tableIdentWithDB, newPartitions)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,68 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}
}

test("SPARK-45731: update partition stats with ANALYZE TABLE") {
val tableName = "analyzeTable_part"

def queryStats(ds: String): Option[CatalogStatistics] = {
val partition =
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
partition.stats
}

withTable(tableName) {
withTempPath { path =>
// Create a table with 3 partitions all located under a single top-level directory 'path'
sql(
s"""
|CREATE TABLE $tableName (key STRING, value STRING)
|USING hive
|PARTITIONED BY (ds STRING)
|LOCATION '${path.toURI}'
""".stripMargin)

val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")

partitionDates.foreach { ds =>
sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
sql("SELECT * FROM src").write.mode(SaveMode.Overwrite)
.format("parquet").save(s"$path/ds=$ds")
}

assert(getCatalogTable(tableName).stats.isEmpty)
partitionDates.foreach { ds =>
assert(queryStats(ds).isEmpty)
}

sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN")

// Table and partition stats should also have been updated
assert(getTableStats(tableName).sizeInBytes == 3 * 4411)
assert(getTableStats(tableName).rowCount.isEmpty)
partitionDates.foreach { ds =>
val partStats = queryStats(ds)
assert(partStats.nonEmpty)
assert(partStats.get.sizeInBytes == 4411)
assert(partStats.get.rowCount.isEmpty)
}

sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")

assert(getTableStats(tableName).sizeInBytes == 3 * 4411)
// Table row count should be updated
assert(getTableStats(tableName).rowCount.get == 75)

partitionDates.foreach { ds =>
val partStats = queryStats(ds)
assert(partStats.nonEmpty)
// The scan option doesn't update partition row count, only size in bytes.
assert(partStats.get.sizeInBytes == 4411)
assert(partStats.get.rowCount.isEmpty)
}
}
}
}

test("analyze single partition") {
val tableName = "analyzeTable_part"

Expand Down

0 comments on commit 7ae76c1

Please sign in to comment.