Skip to content

Commit ccb1154

Browse files
ericlyhuai
authored andcommitted
[SPARK-17970][SQL] store partition spec in metastore for data source table
## What changes were proposed in this pull request? We should follow hive table and also store partition spec in metastore for data source table. This brings 2 benefits: 1. It's more flexible to manage the table data files, as users can use `ADD PARTITION`, `DROP PARTITION` and `RENAME PARTITION` 2. We don't need to cache all file status for data source table anymore. ## How was this patch tested? existing tests. Author: Eric Liang <ekl@databricks.com> Author: Michael Allman <michael@videoamp.com> Author: Eric Liang <ekhliang@gmail.com> Author: Wenchen Fan <wenchen@databricks.com> Closes #15515 from cloud-fan/partition.
1 parent 79fd0cc commit ccb1154

File tree

26 files changed

+596
-329
lines changed

26 files changed

+596
-329
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,10 @@ case class CatalogTablePartition(
8989
parameters: Map[String, String] = Map.empty) {
9090

9191
override def toString: String = {
92+
val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
9293
val output =
9394
Seq(
94-
s"Partition Values: [${spec.values.mkString(", ")}]",
95+
s"Partition Values: [$specString]",
9596
s"$storage",
9697
s"Partition Parameters:{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}")
9798

@@ -137,6 +138,8 @@ case class BucketSpec(
137138
* Can be None if this table is a View, should be "hive" for hive serde tables.
138139
* @param unsupportedFeatures is a list of string descriptions of features that are used by the
139140
* underlying table but not supported by Spark SQL yet.
141+
* @param partitionProviderIsHive whether this table's partition metadata is stored in the Hive
142+
* metastore.
140143
*/
141144
case class CatalogTable(
142145
identifier: TableIdentifier,
@@ -154,7 +157,8 @@ case class CatalogTable(
154157
viewOriginalText: Option[String] = None,
155158
viewText: Option[String] = None,
156159
comment: Option[String] = None,
157-
unsupportedFeatures: Seq[String] = Seq.empty) {
160+
unsupportedFeatures: Seq[String] = Seq.empty,
161+
partitionProviderIsHive: Boolean = false) {
158162

159163
/** schema of this table's partition columns */
160164
def partitionSchema: StructType = StructType(schema.filter {
@@ -212,11 +216,11 @@ case class CatalogTable(
212216
comment.map("Comment: " + _).getOrElse(""),
213217
if (properties.nonEmpty) s"Properties: $tableProperties" else "",
214218
if (stats.isDefined) s"Statistics: ${stats.get.simpleString}" else "",
215-
s"$storage")
219+
s"$storage",
220+
if (partitionProviderIsHive) "Partition Provider: Hive" else "")
216221

217222
output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")
218223
}
219-
220224
}
221225

222226

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,7 @@ class TreeNodeSuite extends SparkFunSuite {
489489
"owner" -> "",
490490
"createTime" -> 0,
491491
"lastAccessTime" -> -1,
492+
"partitionProviderIsHive" -> false,
492493
"properties" -> JNull,
493494
"unsupportedFeatures" -> List.empty[String]))
494495

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import org.apache.spark.annotation.InterfaceStability
2525
import org.apache.spark.sql.catalyst.TableIdentifier
2626
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2727
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
28-
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
28+
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Union}
29+
import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
2930
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
3031
import org.apache.spark.sql.types.StructType
3132

@@ -387,7 +388,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
387388
partitionColumnNames = partitioningColumns.getOrElse(Nil),
388389
bucketSpec = getBucketSpec
389390
)
390-
val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
391+
val createCmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
392+
val cmd = if (tableDesc.partitionColumnNames.nonEmpty &&
393+
df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
394+
// Need to recover partitions into the metastore so our saved data is visible.
395+
val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(tableDesc.identifier)
396+
Union(createCmd, recoverPartitionCmd)
397+
} else {
398+
createCmd
399+
}
391400
df.sparkSession.sessionState.executePlan(cmd).toRdd
392401
}
393402
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ case class AnalyzeColumnCommand(
5050
AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable))
5151

5252
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
53-
updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
53+
updateStats(logicalRel.catalogTable.get,
54+
AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
5455

5556
case otherRelation =>
5657
throw new AnalysisException("ANALYZE TABLE is not supported for " +

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ case class AnalyzeTableCommand(
5151

5252
// data source tables have been converted into LogicalRelations
5353
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
54-
updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
54+
updateTableStats(logicalRel.catalogTable.get,
55+
AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
5556

5657
case otherRelation =>
5758
throw new AnalysisException("ANALYZE TABLE is not supported for " +

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
9494
val newTable = table.copy(
9595
storage = table.storage.copy(properties = optionsWithPath),
9696
schema = dataSource.schema,
97-
partitionColumnNames = partitionColumnNames)
97+
partitionColumnNames = partitionColumnNames,
98+
// If metastore partition management for file source tables is enabled, we start off with
99+
// partition provider hive, but no partitions in the metastore. The user has to call
100+
// `msck repair table` to populate the table partitions.
101+
partitionProviderIsHive = partitionColumnNames.nonEmpty &&
102+
sparkSession.sessionState.conf.manageFilesourcePartitions)
98103
// We will return Nil or throw exception at the beginning if the table already exists, so when
99104
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
100105
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
106+
101107
Seq.empty[Row]
102108
}
103109
}
@@ -232,6 +238,15 @@ case class CreateDataSourceTableAsSelectCommand(
232238
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
233239
}
234240

241+
result match {
242+
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
243+
sparkSession.sqlContext.conf.manageFilesourcePartitions =>
244+
// Need to recover partitions into the metastore so our saved data is visible.
245+
sparkSession.sessionState.executePlan(
246+
AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
247+
case _ =>
248+
}
249+
235250
// Refresh the cache of the table in the catalog.
236251
sessionState.catalog.refreshTable(tableIdentWithDB)
237252
Seq.empty[Row]

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 54 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
2828

2929
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
3030
import org.apache.spark.sql.catalyst.TableIdentifier
31+
import org.apache.spark.sql.catalyst.analysis.Resolver
3132
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
3233
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3334
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
34-
import org.apache.spark.sql.execution.datasources.PartitioningUtils
35+
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
3536
import org.apache.spark.sql.types._
3637
import org.apache.spark.util.SerializableConfiguration
3738

@@ -346,10 +347,7 @@ case class AlterTableAddPartitionCommand(
346347
val catalog = sparkSession.sessionState.catalog
347348
val table = catalog.getTableMetadata(tableName)
348349
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
349-
if (DDLUtils.isDatasourceTable(table)) {
350-
throw new AnalysisException(
351-
"ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API")
352-
}
350+
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE ADD PARTITION")
353351
val parts = partitionSpecsAndLocs.map { case (spec, location) =>
354352
val normalizedSpec = PartitioningUtils.normalizePartitionSpec(
355353
spec,
@@ -382,11 +380,8 @@ case class AlterTableRenamePartitionCommand(
382380
override def run(sparkSession: SparkSession): Seq[Row] = {
383381
val catalog = sparkSession.sessionState.catalog
384382
val table = catalog.getTableMetadata(tableName)
385-
if (DDLUtils.isDatasourceTable(table)) {
386-
throw new AnalysisException(
387-
"ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API")
388-
}
389383
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
384+
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE RENAME PARTITION")
390385

391386
val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec(
392387
oldPartition,
@@ -432,10 +427,7 @@ case class AlterTableDropPartitionCommand(
432427
val catalog = sparkSession.sessionState.catalog
433428
val table = catalog.getTableMetadata(tableName)
434429
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
435-
if (DDLUtils.isDatasourceTable(table)) {
436-
throw new AnalysisException(
437-
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
438-
}
430+
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
439431

440432
val normalizedSpecs = specs.map { spec =>
441433
PartitioningUtils.normalizePartitionSpec(
@@ -493,33 +485,39 @@ case class AlterTableRecoverPartitionsCommand(
493485
}
494486
}
495487

488+
private def getBasePath(table: CatalogTable): Option[String] = {
489+
if (table.provider == Some("hive")) {
490+
table.storage.locationUri
491+
} else {
492+
new CaseInsensitiveMap(table.storage.properties).get("path")
493+
}
494+
}
495+
496496
override def run(spark: SparkSession): Seq[Row] = {
497497
val catalog = spark.sessionState.catalog
498498
val table = catalog.getTableMetadata(tableName)
499499
val tableIdentWithDB = table.identifier.quotedString
500500
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
501-
if (DDLUtils.isDatasourceTable(table)) {
502-
throw new AnalysisException(
503-
s"Operation not allowed: $cmd on datasource tables: $tableIdentWithDB")
504-
}
505501
if (table.partitionColumnNames.isEmpty) {
506502
throw new AnalysisException(
507503
s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
508504
}
509-
if (table.storage.locationUri.isEmpty) {
505+
506+
val tablePath = getBasePath(table)
507+
if (tablePath.isEmpty) {
510508
throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
511509
s"location provided: $tableIdentWithDB")
512510
}
513511

514-
val root = new Path(table.storage.locationUri.get)
512+
val root = new Path(tablePath.get)
515513
logInfo(s"Recover all the partitions in $root")
516514
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
517515

518516
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
519517
val hadoopConf = spark.sparkContext.hadoopConfiguration
520518
val pathFilter = getPathFilter(hadoopConf)
521-
val partitionSpecsAndLocs = scanPartitions(
522-
spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold)
519+
val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
520+
table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)
523521
val total = partitionSpecsAndLocs.length
524522
logInfo(s"Found $total partitions in $root")
525523

@@ -531,6 +529,11 @@ case class AlterTableRecoverPartitionsCommand(
531529
logInfo(s"Finished to gather the fast stats for all $total partitions.")
532530

533531
addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
532+
// Updates the table to indicate that its partition metadata is stored in the Hive metastore.
533+
// This is always the case for Hive format tables, but is not true for Datasource tables created
534+
// before Spark 2.1 unless they are converted via `msck repair table`.
535+
spark.sessionState.catalog.alterTable(table.copy(partitionProviderIsHive = true))
536+
catalog.refreshTable(tableName)
534537
logInfo(s"Recovered all partitions ($total).")
535538
Seq.empty[Row]
536539
}
@@ -544,7 +547,8 @@ case class AlterTableRecoverPartitionsCommand(
544547
path: Path,
545548
spec: TablePartitionSpec,
546549
partitionNames: Seq[String],
547-
threshold: Int): GenSeq[(TablePartitionSpec, Path)] = {
550+
threshold: Int,
551+
resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
548552
if (partitionNames.isEmpty) {
549553
return Seq(spec -> path)
550554
}
@@ -563,15 +567,15 @@ case class AlterTableRecoverPartitionsCommand(
563567
val name = st.getPath.getName
564568
if (st.isDirectory && name.contains("=")) {
565569
val ps = name.split("=", 2)
566-
val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
570+
val columnName = PartitioningUtils.unescapePathName(ps(0))
567571
// TODO: Validate the value
568572
val value = PartitioningUtils.unescapePathName(ps(1))
569-
// comparing with case-insensitive, but preserve the case
570-
if (columnName == partitionNames.head) {
571-
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value),
572-
partitionNames.drop(1), threshold)
573+
if (resolver(columnName, partitionNames.head)) {
574+
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
575+
partitionNames.drop(1), threshold, resolver)
573576
} else {
574-
logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it")
577+
logWarning(
578+
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
575579
Seq()
576580
}
577581
} else {
@@ -676,16 +680,11 @@ case class AlterTableSetLocationCommand(
676680
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
677681
partitionSpec match {
678682
case Some(spec) =>
683+
DDLUtils.verifyPartitionProviderIsHive(
684+
sparkSession, table, "ALTER TABLE ... SET LOCATION")
679685
// Partition spec is specified, so we set the location only for this partition
680686
val part = catalog.getPartition(table.identifier, spec)
681-
val newPart =
682-
if (DDLUtils.isDatasourceTable(table)) {
683-
throw new AnalysisException(
684-
"ALTER TABLE SET LOCATION for partition is not allowed for tables defined " +
685-
"using the datasource API")
686-
} else {
687-
part.copy(storage = part.storage.copy(locationUri = Some(location)))
688-
}
687+
val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location)))
689688
catalog.alterPartitions(table.identifier, Seq(newPart))
690689
case None =>
691690
// No partition spec is specified, so we set the location for the table itself
@@ -709,6 +708,25 @@ object DDLUtils {
709708
table.provider.isDefined && table.provider.get != "hive"
710709
}
711710

711+
/**
712+
* Throws a standard error for actions that require partitionProvider = hive.
713+
*/
714+
def verifyPartitionProviderIsHive(
715+
spark: SparkSession, table: CatalogTable, action: String): Unit = {
716+
val tableName = table.identifier.table
717+
if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) {
718+
throw new AnalysisException(
719+
s"$action is not allowed on $tableName since filesource partition management is " +
720+
"disabled (spark.sql.hive.manageFilesourcePartitions = false).")
721+
}
722+
if (!table.partitionProviderIsHive && isDatasourceTable(table)) {
723+
throw new AnalysisException(
724+
s"$action is not allowed on $tableName since its partition metadata is not stored in " +
725+
"the Hive metastore. To import this information into the metastore, run " +
726+
s"`msck repair table $tableName`")
727+
}
728+
}
729+
712730
/**
713731
* If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
714732
* issue an exception [[AnalysisException]].

0 commit comments

Comments
 (0)