Skip to content

[SPARK-17970][SQL] store partition spec in metastore for data source table #15515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 114 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
114 commits
Select commit Hold shift + click to select a range
c2eacb7
[SPARK-16980][SQL] Load only catalog table partition metadata required
Aug 10, 2016
1f611c4
Add a new catalyst optimizer rule to SQL core for pruning unneeded
Sep 13, 2016
8b24ead
Include the type of file catalog in the FileSourceScanExec metadata
Oct 8, 2016
f82f0d2
TODO: Consider renaming FileCatalog to better differentiate it from
Oct 8, 2016
198dd94
Refactor the FileSourceScanExec.metadata val to make it prettier
Oct 11, 2016
1f0d5d8
try out parquet case insensitive fallback
ericl Oct 11, 2016
59de5ca
fix and add test for input files
ericl Oct 11, 2016
3b51624
rename test
ericl Oct 11, 2016
acc84f0
Refactor `TableFileCatalog.listFiles` to call `listDataLeafFiles` once
Oct 11, 2016
f94863d
fix it
ericl Oct 13, 2016
022d5b9
more test cases
ericl Oct 13, 2016
8bd27be
also fix a bug with zero partitions selected
ericl Oct 13, 2016
0958bcd
feature flag
ericl Oct 12, 2016
291cee7
add comments
ericl Oct 12, 2016
627572e
extend and fix flakiness in test
ericl Oct 13, 2016
6d8e7ea
Enhance `ParquetMetastoreSuite` with mixed-case partition columns
Oct 13, 2016
21caa93
Tidy up a little by removing some unused imports, an unused method and
Oct 13, 2016
d7795cd
Put partition count in `FileSourceScanExec.metadata` for partitioned
Oct 13, 2016
765f93c
Fix some errors in my revision of `ParquetSourceSuite`
Oct 13, 2016
e1635e4
Add metrics and cost tests for partition pruning effectiveness (#5)
ericl Oct 14, 2016
71049d1
Actually register the hive catalog metrics, also revert broken tests …
ericl Oct 14, 2016
6a63afd
Fri Oct 14 14:04:01 PDT 2016
ericl Oct 14, 2016
6b02b3c
[SPARK-16980][SQL] Load only catalog table partition metadata required
Oct 14, 2016
e816919
Add a new catalyst optimizer rule to SQL core for pruning unnecessary
Sep 13, 2016
8cca6dc
Include the type of file catalog in the FileSourceScanExec metadata
Oct 8, 2016
7acc3f1
try out parquet case insensitive fallback
ericl Oct 11, 2016
cf7d1f1
Refactor the FileSourceScanExec.metadata val to make it prettier
Oct 11, 2016
c75855c
fix and add test for input files
ericl Oct 11, 2016
821372f
rename test
ericl Oct 11, 2016
d0b893b
Refactor `TableFileCatalog.listFiles` to call `listDataLeafFiles` once
Oct 11, 2016
c47a2a3
feature flag
ericl Oct 12, 2016
ed7dd37
add comments
ericl Oct 12, 2016
bdff488
fix it
ericl Oct 13, 2016
5ad4b25
more test cases
ericl Oct 13, 2016
fa19224
also fix a bug with zero partitions selected
ericl Oct 13, 2016
00bf912
extend and fix flakiness in test
ericl Oct 13, 2016
b5f7691
Enhance `ParquetMetastoreSuite` with mixed-case partition columns
Oct 13, 2016
77932a1
Tidy up a little by removing some unused imports, an unused method and
Oct 13, 2016
97cd27d
Put partition count in `FileSourceScanExec.metadata` for partitioned
Oct 13, 2016
851d7f9
Fix some errors in my revision of `ParquetSourceSuite`
Oct 13, 2016
26e0d34
Add metrics and cost tests for partition pruning effectiveness (#5)
ericl Oct 14, 2016
83a168c
Actually register the hive catalog metrics, also revert broken tests …
ericl Oct 14, 2016
8871e3a
stray println
ericl Oct 14, 2016
151f7ac
Fri Oct 14 14:49:31 PDT 2016
ericl Oct 14, 2016
014c998
Also support mixed case field resolution for converted ORC tables (#7)
ericl Oct 14, 2016
f967ed8
Fri Oct 14 15:06:46 PDT 2016
ericl Oct 14, 2016
fb1951c
Merge branch 'base' into meta-cache
ericl Oct 14, 2016
dd55499
wip
ericl Oct 15, 2016
6d06414
Merge branch 'master' into meta-cache
ericl Oct 17, 2016
1602ecf
Mon Oct 17 15:42:29 PDT 2016
ericl Oct 17, 2016
a6e78f7
Mon Oct 17 15:59:01 PDT 2016
ericl Oct 17, 2016
1413744
Revert "Revert "[SPARK-17974] Refactor FileCatalog classes to simplif…
ericl Oct 18, 2016
699bce5
fix it
ericl Oct 18, 2016
07506e7
Merge branch 'fix-scalastyle-revert' into meta-cache
ericl Oct 18, 2016
2cef03b
update
ericl Oct 18, 2016
2c34fc7
Merge branch 'master' into meta-cache
ericl Oct 18, 2016
d1d9d0b
Tue Oct 18 16:00:27 PDT 2016
ericl Oct 18, 2016
947c32d
update
ericl Oct 18, 2016
b9272c2
Tue Oct 18 19:11:58 PDT 2016
ericl Oct 19, 2016
96b12e5
store partition spec in metastore for data source table
cloud-fan Oct 19, 2016
3fba74c
comments
ericl Oct 19, 2016
f63ce3c
Merge branch 'master' into meta-cache
ericl Oct 19, 2016
f318aa7
Wed Oct 19 14:25:08 PDT 2016
ericl Oct 19, 2016
0b54b4c
byte limit
ericl Oct 19, 2016
35b565b
update
ericl Oct 19, 2016
766a368
backwards compat support
ericl Oct 20, 2016
c188bf9
Merge branch 'master' into datasource-part-management
ericl Oct 20, 2016
4d93f48
fix flag use
ericl Oct 20, 2016
4647f1f
Thu Oct 20 12:00:02 PDT 2016
ericl Oct 20, 2016
aed909c
Merge branch 'meta-cache' into partition
ericl Oct 20, 2016
44f6c70
make tests cover ds tables too
ericl Oct 20, 2016
e940cb9
Thu Oct 20 13:10:12 PDT 2016
ericl Oct 20, 2016
6733ba6
global cache
ericl Oct 20, 2016
9d72825
Thu Oct 20 14:28:31 PDT 2016
ericl Oct 20, 2016
9ada9b5
Thu Oct 20 14:30:36 PDT 2016
ericl Oct 20, 2016
11f3654
Thu Oct 20 14:34:53 PDT 2016
ericl Oct 20, 2016
3b6398b
Thu Oct 20 14:39:28 PDT 2016
ericl Oct 20, 2016
57452a9
cleanup
ericl Oct 20, 2016
2ec970f
Thu Oct 20 14:42:24 PDT 2016
ericl Oct 20, 2016
17f23cd
Thu Oct 20 15:06:34 PDT 2016
ericl Oct 20, 2016
c0711ad
Update PartitioningAwareFileCatalog.scala
ericl Oct 21, 2016
262f6ee
Update ListingFileCatalog.scala
ericl Oct 21, 2016
2ee5665
fix tests
ericl Oct 21, 2016
2e69cab
cleanup
ericl Oct 21, 2016
2affb92
Merge branch 'master' into meta-cache
ericl Oct 21, 2016
a614ed4
Merge remote-tracking branch 'origin/meta-cache' into meta-cache
ericl Oct 21, 2016
2a96537
Fri Oct 21 13:06:03 PDT 2016
ericl Oct 21, 2016
f40e72f
Merge branch 'meta-cache' into partition
ericl Oct 21, 2016
0d3b074
Fri Oct 21 14:24:22 PDT 2016
ericl Oct 21, 2016
390c2db
conf name change
ericl Oct 21, 2016
4294510
fix compile
ericl Oct 21, 2016
de6c00d
Fri Oct 21 14:36:12 PDT 2016
ericl Oct 21, 2016
1b73b7b
Fri Oct 21 15:15:09 PDT 2016
ericl Oct 21, 2016
4da7724
Fri Oct 21 15:31:38 PDT 2016
ericl Oct 21, 2016
6687c7b
Fri Oct 21 15:38:42 PDT 2016
ericl Oct 21, 2016
34b1ae6
Fri Oct 21 15:40:42 PDT 2016
ericl Oct 21, 2016
aa5b24f
Fri Oct 21 17:28:05 PDT 2016
ericl Oct 22, 2016
f903243
fix tests
ericl Oct 22, 2016
f882988
Merge branch 'master' into partition
ericl Oct 24, 2016
e42bb5a
fix statistics collection
ericl Oct 24, 2016
bc659c5
better backwards compat
ericl Oct 24, 2016
ba4f32f
also compute stats for column analyze
ericl Oct 24, 2016
df5d439
Merge branch 'master' into partition
ericl Oct 25, 2016
8f9d219
Merge remote-tracking branch 'origin/master' into partition
cloud-fan Oct 25, 2016
ef954fb
minor updates
cloud-fan Oct 25, 2016
802372c
fix case preserving
cloud-fan Oct 25, 2016
e3f6610
comments and fix mixed case
ericl Oct 25, 2016
a442aa4
another small test
ericl Oct 25, 2016
87a6b40
fix test
ericl Oct 25, 2016
05fd862
make partitionProviderIsHive a field
cloud-fan Oct 26, 2016
012c124
fix debug mode test
ericl Oct 26, 2016
9a6fff6
fix in memory catalog test
ericl Oct 26, 2016
8c80555
fix tree-node suite
ericl Oct 27, 2016
b6776cc
minor cleanup
cloud-fan Oct 27, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ case class CatalogTablePartition(
parameters: Map[String, String] = Map.empty) {

override def toString: String = {
val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
val output =
Seq(
s"Partition Values: [${spec.values.mkString(", ")}]",
s"Partition Values: [$specString]",
s"$storage",
s"Partition Parameters:{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}")

Expand Down Expand Up @@ -137,6 +138,8 @@ case class BucketSpec(
* Can be None if this table is a View, should be "hive" for hive serde tables.
* @param unsupportedFeatures is a list of string descriptions of features that are used by the
* underlying table but not supported by Spark SQL yet.
* @param partitionProviderIsHive whether this table's partition metadata is stored in the Hive
* metastore.
*/
case class CatalogTable(
identifier: TableIdentifier,
Expand All @@ -154,7 +157,8 @@ case class CatalogTable(
viewOriginalText: Option[String] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty) {
unsupportedFeatures: Seq[String] = Seq.empty,
partitionProviderIsHive: Boolean = false) {

/** schema of this table's partition columns */
def partitionSchema: StructType = StructType(schema.filter {
Expand Down Expand Up @@ -212,11 +216,11 @@ case class CatalogTable(
comment.map("Comment: " + _).getOrElse(""),
if (properties.nonEmpty) s"Properties: $tableProperties" else "",
if (stats.isDefined) s"Statistics: ${stats.get.simpleString}" else "",
s"$storage")
s"$storage",
if (partitionProviderIsHive) "Partition Provider: Hive" else "")

output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ class TreeNodeSuite extends SparkFunSuite {
"owner" -> "",
"createTime" -> 0,
"lastAccessTime" -> -1,
"partitionProviderIsHive" -> false,
"properties" -> JNull,
"unsupportedFeatures" -> List.empty[String]))

Expand Down
13 changes: 11 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Union}
import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -387,7 +388,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
partitionColumnNames = partitioningColumns.getOrElse(Nil),
bucketSpec = getBucketSpec
)
val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
val createCmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
val cmd = if (tableDesc.partitionColumnNames.nonEmpty &&
df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
// Need to recover partitions into the metastore so our saved data is visible.
val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(tableDesc.identifier)
Union(createCmd, recoverPartitionCmd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return createCmd (also check recoverPartitionCmd)? Putting them together looks weird.

Copy link
Contributor

@ericl ericl Oct 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok, otherwise you'd need a lot of special logic just for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a special node for running a sequence of commands. We are relying on the implementation of Union at here. Let's address this in a follow-up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I'll file a ticket after this is merged

} else {
createCmd
}
df.sparkSession.sessionState.executePlan(cmd).toRdd
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ case class AnalyzeColumnCommand(
AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable))

case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
updateStats(logicalRel.catalogTable.get,
AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this? AnalyzeTableCommand.calculateTotalSize reads a special hive property hive.exec.stagingdir, does data source table also need to take care of it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed since the logical relation sizeInBytes is no longer the latest size. Previously ListingFileCatalog would compute the size, but TableFileCatalog just reads whatever size was present in the table metadata last.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How's the cost of AnalyzeTableCommand.calculateTotalSize? Also, why is sizeInBytes not the latest size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that the relation's size is no longer computed when it is resolved, so we have to force a table scan here to get an updated size.

Weird that github reordered my comment above actually ^


case otherRelation =>
throw new AnalysisException("ANALYZE TABLE is not supported for " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ case class AnalyzeTableCommand(

// data source tables have been converted into LogicalRelations
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
updateTableStats(logicalRel.catalogTable.get,
AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))

case otherRelation =>
throw new AnalysisException("ANALYZE TABLE is not supported for " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
val newTable = table.copy(
storage = table.storage.copy(properties = optionsWithPath),
schema = dataSource.schema,
partitionColumnNames = partitionColumnNames)
partitionColumnNames = partitionColumnNames,
// If metastore partition management for file source tables is enabled, we start off with
// partition provider hive, but no partitions in the metastore. The user has to call
// `msck repair table` to populate the table partitions.
partitionProviderIsHive = partitionColumnNames.nonEmpty &&
sparkSession.sessionState.conf.manageFilesourcePartitions)
// We will return Nil or throw exception at the beginning if the table already exists, so when
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
sessionState.catalog.createTable(newTable, ignoreIfExists = false)

Seq.empty[Row]
}
}
Expand Down Expand Up @@ -232,6 +238,15 @@ case class CreateDataSourceTableAsSelectCommand(
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
}

result match {
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
sparkSession.sqlContext.conf.manageFilesourcePartitions =>
// Need to recover partitions into the metastore so our saved data is visible.
sparkSession.sessionState.executePlan(
AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we have a similar logic at https://github.com/apache/spark/pull/15515/files#diff-94fbd986b04087223f53697d4b6cab24R396. Are we recovering partitions twice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just double checked with create table test_sel2 USING parquet PARTITIONED BY (fieldone, fieldtwo) AS SELECT id as fieldzero, id as fieldone, id as fieldtwo from range(100) and it is using a different path, so the recovery is not duplicated.

case _ =>
}

// Refresh the cache of the table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)
Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -346,10 +347,7 @@ case class AlterTableAddPartitionCommand(
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API")
}
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE ADD PARTITION")
val parts = partitionSpecsAndLocs.map { case (spec, location) =>
val normalizedSpec = PartitioningUtils.normalizePartitionSpec(
spec,
Expand Down Expand Up @@ -382,11 +380,8 @@ case class AlterTableRenamePartitionCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API")
}
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE RENAME PARTITION")

val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec(
oldPartition,
Expand Down Expand Up @@ -432,10 +427,7 @@ case class AlterTableDropPartitionCommand(
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
}
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")

val normalizedSpecs = specs.map { spec =>
PartitioningUtils.normalizePartitionSpec(
Expand Down Expand Up @@ -493,33 +485,39 @@ case class AlterTableRecoverPartitionsCommand(
}
}

private def getBasePath(table: CatalogTable): Option[String] = {
if (table.provider == Some("hive")) {
table.storage.locationUri
} else {
new CaseInsensitiveMap(table.storage.properties).get("path")
}
}

override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val tableIdentWithDB = table.identifier.quotedString
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
s"Operation not allowed: $cmd on datasource tables: $tableIdentWithDB")
}
if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
}
if (table.storage.locationUri.isEmpty) {

val tablePath = getBasePath(table)
if (tablePath.isEmpty) {
throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
s"location provided: $tableIdentWithDB")
}

val root = new Path(table.storage.locationUri.get)
val root = new Path(tablePath.get)
logInfo(s"Recover all the partitions in $root")
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)

val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
val hadoopConf = spark.sparkContext.hadoopConfiguration
val pathFilter = getPathFilter(hadoopConf)
val partitionSpecsAndLocs = scanPartitions(
spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold)
val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)
val total = partitionSpecsAndLocs.length
logInfo(s"Found $total partitions in $root")

Expand All @@ -531,6 +529,11 @@ case class AlterTableRecoverPartitionsCommand(
logInfo(s"Finished to gather the fast stats for all $total partitions.")

addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
// Updates the table to indicate that its partition metadata is stored in the Hive metastore.
// This is always the case for Hive format tables, but is not true for Datasource tables created
// before Spark 2.1 unless they are converted via `msck repair table`.
spark.sessionState.catalog.alterTable(table.copy(partitionProviderIsHive = true))
catalog.refreshTable(tableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also update the doc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a pretty major change for 2.1. Shall we do it in a followup once the patches for 2.1 are finalized?

logInfo(s"Recovered all partitions ($total).")
Seq.empty[Row]
}
Expand All @@ -544,7 +547,8 @@ case class AlterTableRecoverPartitionsCommand(
path: Path,
spec: TablePartitionSpec,
partitionNames: Seq[String],
threshold: Int): GenSeq[(TablePartitionSpec, Path)] = {
threshold: Int,
resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
if (partitionNames.isEmpty) {
return Seq(spec -> path)
}
Expand All @@ -563,15 +567,15 @@ case class AlterTableRecoverPartitionsCommand(
val name = st.getPath.getName
if (st.isDirectory && name.contains("=")) {
val ps = name.split("=", 2)
val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
val columnName = PartitioningUtils.unescapePathName(ps(0))
// TODO: Validate the value
val value = PartitioningUtils.unescapePathName(ps(1))
// comparing with case-insensitive, but preserve the case
if (columnName == partitionNames.head) {
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value),
partitionNames.drop(1), threshold)
if (resolver(columnName, partitionNames.head)) {
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
partitionNames.drop(1), threshold, resolver)
} else {
logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it")
logWarning(
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
Seq()
}
} else {
Expand Down Expand Up @@ -676,16 +680,11 @@ case class AlterTableSetLocationCommand(
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
partitionSpec match {
case Some(spec) =>
DDLUtils.verifyPartitionProviderIsHive(
sparkSession, table, "ALTER TABLE ... SET LOCATION")
// Partition spec is specified, so we set the location only for this partition
val part = catalog.getPartition(table.identifier, spec)
val newPart =
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"ALTER TABLE SET LOCATION for partition is not allowed for tables defined " +
"using the datasource API")
} else {
part.copy(storage = part.storage.copy(locationUri = Some(location)))
}
val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location)))
catalog.alterPartitions(table.identifier, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
Expand All @@ -709,6 +708,25 @@ object DDLUtils {
table.provider.isDefined && table.provider.get != "hive"
}

/**
* Throws a standard error for actions that require partitionProvider = hive.
*/
def verifyPartitionProviderIsHive(
spark: SparkSession, table: CatalogTable, action: String): Unit = {
val tableName = table.identifier.table
if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) {
throw new AnalysisException(
s"$action is not allowed on $tableName since filesource partition management is " +
"disabled (spark.sql.hive.manageFilesourcePartitions = false).")
}
if (!table.partitionProviderIsHive && isDatasourceTable(table)) {
throw new AnalysisException(
s"$action is not allowed on $tableName since its partition metadata is not stored in " +
"the Hive metastore. To import this information into the metastore, run " +
s"`msck repair table $tableName`")
}
}

/**
* If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
* issue an exception [[AnalysisException]].
Expand Down
Loading