Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Nov 9, 2023
1 parent 5c1ba71 commit 1b6b0a5
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,14 @@ object CommandUtils extends Logging {
(BigInt, Seq[CatalogTablePartition]) = {
val sessionState = spark.sessionState
val startTime = System.nanoTime()
val (totalSize, newPartitions) = if (catalogTable.partitionColumnNames.isEmpty) {
val (totalSize: BigInt, newPartitions) = if (catalogTable.partitionColumnNames.isEmpty) {
(calculateSingleLocationSize(sessionState, catalogTable.identifier,
catalogTable.storage.locationUri), Seq())
} else {
// Calculate table size as a sum of the visible partitions. See SPARK-21079
val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
logInfo(s"Starting to calculate sizes for ${partitions.length} partitions.")
val (sizes, newPartitions) = calculatePartitionStats(spark, catalogTable, partitions,
partitionRowCount)
(sizes.sum, newPartitions)
calculatePartitionStats(spark, catalogTable, partitions, partitionRowCount)
}
logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to calculate" +
s" the total size for table ${catalogTable.identifier}.")
Expand All @@ -105,15 +103,15 @@ object CommandUtils extends Logging {
catalogTable: CatalogTable,
partitions: Seq[CatalogTablePartition],
partitionRowCount: Option[Map[TablePartitionSpec, BigInt]] = None):
(Seq[Long], Seq[CatalogTablePartition]) = {
(BigInt, Seq[CatalogTablePartition]) = {
val paths = partitions.map(_.storage.locationUri)
val sizes = calculateMultipleLocationSizes(spark, catalogTable.identifier, paths)
val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
val newRowCount = partitionRowCount.flatMap(_.get(p.spec))
val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount)
newStats.map(_ => p.copy(stats = newStats))
}
(sizes, newPartitions)
(sizes.sum, newPartitions)
}

def calculateSingleLocationSize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,12 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
partition.stats
}

val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
val expectedRowCount = 25

Seq(true, false).foreach { partitionStatsEnabled =>
withSQLConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED.key ->
partitionStatsEnabled.toString) {
partitionStatsEnabled.toString) {
withTable(tableName) {
withTempPath { path =>
// Create a table with 3 partitions all located under a directory 'path'
Expand All @@ -384,9 +387,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
|USING hive
|PARTITIONED BY (ds STRING)
|LOCATION '${path.toURI}'
""".stripMargin)

val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
""".stripMargin)

partitionDates.foreach { ds =>
sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'")
Expand All @@ -401,8 +402,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto

sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN")

val expectedRowCount = 25

// Table size should also have been updated
assert(getTableStats(tableName).sizeInBytes > 0)
// Row count should NOT be updated with the `NOSCAN` option
Expand Down

0 comments on commit 1b6b0a5

Please sign in to comment.