Skip to content

Commit b946df0

Browse files
author
wangzhenhua
committed
update based on comments
1 parent b6c655a commit b946df0

File tree

6 files changed

+47
-41
lines changed

6 files changed

+47
-41
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ case class CatalogTable(
191191
viewText.map("View: " + _).getOrElse(""),
192192
comment.map("Comment: " + _).getOrElse(""),
193193
if (properties.nonEmpty) s"Properties: $tableProperties" else "",
194+
if (stats.isDefined) s"Statistics: ${stats.get}" else "",
194195
s"$storage")
195196

196197
output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,13 @@ package org.apache.spark.sql.catalyst.plans.logical
3737
case class Statistics(
3838
sizeInBytes: BigInt,
3939
rowCount: Option[BigInt] = None,
40-
isBroadcastable: Boolean = false)
40+
isBroadcastable: Boolean = false) {
41+
override def toString: String = {
42+
val output =
43+
Seq(s"sizeInBytes=$sizeInBytes",
44+
if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
45+
s"isBroadcastable=$isBroadcastable"
46+
)
47+
output.filter(_.nonEmpty).mkString("Statistics(", ", ", ")")
48+
}
49+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,35 +85,27 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend
8585
}
8686
}.getOrElse(0L)
8787

88-
updateTableStats(
89-
catalogTable,
90-
oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L),
91-
oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L),
92-
newTotalSize = newTotalSize)
88+
updateTableStats(catalogTable, newTotalSize)
9389

9490
// data source tables have been converted into LogicalRelations
9591
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
96-
val table = logicalRel.catalogTable.get
97-
updateTableStats(
98-
table,
99-
oldTotalSize = table.stats.map(_.sizeInBytes.toLong).getOrElse(0L),
100-
oldRowCount = table.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L),
101-
newTotalSize = logicalRel.relation.sizeInBytes)
92+
updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
10293

10394
case otherRelation =>
10495
throw new AnalysisException(s"ANALYZE TABLE is not supported for " +
10596
s"${otherRelation.nodeName}.")
10697
}
10798

108-
def updateTableStats(
109-
catalogTable: CatalogTable,
110-
oldTotalSize: Long,
111-
oldRowCount: Long,
112-
newTotalSize: Long): Unit = {
99+
def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
100+
val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
101+
val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
113102
var newStats: Option[Statistics] = None
114103
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
115104
newStats = Some(Statistics(sizeInBytes = newTotalSize))
116105
}
106+
// We only set rowCount when noscan is false, because otherwise we can't know whether the
107+
// row count we get (`oldRowCount`) is valid or not.
108+
// This is to make sure that we only record the right statistics.
117109
if (!noscan) {
118110
val newRowCount = Dataset.ofRows(sparkSession, relation).count()
119111
if (newRowCount >= 0 && newRowCount != oldRowCount) {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,7 @@ object DataSourceStrategy extends Strategy with Logging {
236236
(requestedColumns, allPredicates, _) =>
237237
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil
238238

239-
case PhysicalOperation(projects, filters,
240-
l @ LogicalRelation(t: PrunedFilteredScan, _, _)) =>
239+
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _)) =>
241240
pruneFilterProject(
242241
l,
243242
projects,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
102102
* metastore.
103103
*/
104104
private def verifyTableProperties(table: CatalogTable): Unit = {
105-
val datasourceKeys = table.properties.keys.filter(_.startsWith(DATASOURCE_PREFIX))
106-
if (datasourceKeys.nonEmpty) {
105+
val invalidKeys = table.properties.keys.filter { key =>
106+
key.startsWith(DATASOURCE_PREFIX) || key.startsWith(STATISTICS_PREFIX)
107+
}
108+
if (invalidKeys.nonEmpty) {
107109
throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " +
108-
s"as table property keys may not start with '$DATASOURCE_PREFIX': " +
109-
datasourceKeys.mkString("[", ", ", "]"))
110+
s"as table property keys may not start with '$DATASOURCE_PREFIX' or '$STATISTICS_PREFIX':" +
111+
s" ${invalidKeys.mkString("[", ", ", "]")}")
110112
}
111113
}
112114

@@ -385,7 +387,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
385387
verifyTableProperties(tableDefinition)
386388

387389
// convert table statistics to properties so that we can persist them through hive api
388-
val catalogTable = if (tableDefinition.stats.isDefined) {
390+
val withStatsProps = if (tableDefinition.stats.isDefined) {
389391
val stats = tableDefinition.stats.get
390392
var statsProperties: Map[String, String] =
391393
Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
@@ -397,21 +399,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
397399
tableDefinition
398400
}
399401

400-
if (DDLUtils.isDatasourceTable(catalogTable)) {
401-
val oldDef = client.getTable(db, catalogTable.identifier.table)
402+
if (DDLUtils.isDatasourceTable(withStatsProps)) {
403+
val oldDef = client.getTable(db, withStatsProps.identifier.table)
402404
// Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
403405
// to retain the spark specific format if it is. Also add old data source properties to table
404406
// properties, to retain the data source table format.
405407
val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
406-
val newDef = catalogTable.copy(
408+
val newDef = withStatsProps.copy(
407409
schema = oldDef.schema,
408410
partitionColumnNames = oldDef.partitionColumnNames,
409411
bucketSpec = oldDef.bucketSpec,
410-
properties = oldDataSourceProps ++ catalogTable.properties)
412+
properties = oldDataSourceProps ++ withStatsProps.properties)
411413

412414
client.alterTable(newDef)
413415
} else {
414-
client.alterTable(catalogTable)
416+
client.alterTable(withStatsProps)
415417
}
416418
}
417419

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -731,26 +731,29 @@ class HiveDDLSuite
731731
}
732732
}
733733

734-
test("datasource table property keys are not allowed") {
734+
test("datasource and statistics table property keys are not allowed") {
735735
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_PREFIX
736+
import org.apache.spark.sql.hive.HiveExternalCatalog.STATISTICS_PREFIX
736737

737738
withTable("tbl") {
738739
sql("CREATE TABLE tbl(a INT) STORED AS parquet")
739740

740-
val e = intercept[AnalysisException] {
741-
sql(s"ALTER TABLE tbl SET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo' = 'loser')")
742-
}
743-
assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo"))
741+
Seq(DATASOURCE_PREFIX, STATISTICS_PREFIX).foreach { forbiddenPrefix =>
742+
val e = intercept[AnalysisException] {
743+
sql(s"ALTER TABLE tbl SET TBLPROPERTIES ('${forbiddenPrefix}foo' = 'loser')")
744+
}
745+
assert(e.getMessage.contains(forbiddenPrefix + "foo"))
744746

745-
val e2 = intercept[AnalysisException] {
746-
sql(s"ALTER TABLE tbl UNSET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo')")
747-
}
748-
assert(e2.getMessage.contains(DATASOURCE_PREFIX + "foo"))
747+
val e2 = intercept[AnalysisException] {
748+
sql(s"ALTER TABLE tbl UNSET TBLPROPERTIES ('${forbiddenPrefix}foo')")
749+
}
750+
assert(e2.getMessage.contains(forbiddenPrefix + "foo"))
749751

750-
val e3 = intercept[AnalysisException] {
751-
sql(s"CREATE TABLE tbl TBLPROPERTIES ('${DATASOURCE_PREFIX}foo'='anything')")
752+
val e3 = intercept[AnalysisException] {
753+
sql(s"CREATE TABLE tbl TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
754+
}
755+
assert(e3.getMessage.contains(forbiddenPrefix + "foo"))
752756
}
753-
assert(e3.getMessage.contains(DATASOURCE_PREFIX + "foo"))
754757
}
755758
}
756759
}

0 commit comments

Comments
 (0)