-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-17970][SQL] store partition spec in metastore for data source table #15515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c2eacb7
1f611c4
8b24ead
f82f0d2
198dd94
1f0d5d8
59de5ca
3b51624
acc84f0
f94863d
022d5b9
8bd27be
0958bcd
291cee7
627572e
6d8e7ea
21caa93
d7795cd
765f93c
e1635e4
71049d1
6a63afd
6b02b3c
e816919
8cca6dc
7acc3f1
cf7d1f1
c75855c
821372f
d0b893b
c47a2a3
ed7dd37
bdff488
5ad4b25
fa19224
00bf912
b5f7691
77932a1
97cd27d
851d7f9
26e0d34
83a168c
8871e3a
151f7ac
014c998
f967ed8
fb1951c
dd55499
6d06414
1602ecf
a6e78f7
1413744
699bce5
07506e7
2cef03b
2c34fc7
d1d9d0b
947c32d
b9272c2
96b12e5
3fba74c
f63ce3c
f318aa7
0b54b4c
35b565b
766a368
c188bf9
4d93f48
4647f1f
aed909c
44f6c70
e940cb9
6733ba6
9d72825
9ada9b5
11f3654
3b6398b
57452a9
2ec970f
17f23cd
c0711ad
262f6ee
2ee5665
2e69cab
2affb92
a614ed4
2a96537
f40e72f
0d3b074
390c2db
4294510
de6c00d
1b73b7b
4da7724
6687c7b
34b1ae6
aa5b24f
f903243
f882988
e42bb5a
bc659c5
ba4f32f
df5d439
8f9d219
ef954fb
802372c
e3f6610
a442aa4
87a6b40
05fd862
012c124
9a6fff6
8c80555
b6776cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,7 +50,8 @@ case class AnalyzeColumnCommand( | |
AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) | ||
|
||
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => | ||
updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) | ||
updateStats(logicalRel.catalogTable.get, | ||
AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why change this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is needed since the logical relation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How's the cost of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason is that the relation's size is no longer computed when it is resolved, so we have to force a table scan here to get an updated size. Weird that github reordered my comment above actually ^ |
||
|
||
case otherRelation => | ||
throw new AnalysisException("ANALYZE TABLE is not supported for " + | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,10 +94,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo | |
val newTable = table.copy( | ||
storage = table.storage.copy(properties = optionsWithPath), | ||
schema = dataSource.schema, | ||
partitionColumnNames = partitionColumnNames) | ||
partitionColumnNames = partitionColumnNames, | ||
// If metastore partition management for file source tables is enabled, we start off with | ||
// partition provider hive, but no partitions in the metastore. The user has to call | ||
// `msck repair table` to populate the table partitions. | ||
partitionProviderIsHive = partitionColumnNames.nonEmpty && | ||
sparkSession.sessionState.conf.manageFilesourcePartitions) | ||
// We will return Nil or throw exception at the beginning if the table already exists, so when | ||
// we reach here, the table should not exist and we should set `ignoreIfExists` to false. | ||
sessionState.catalog.createTable(newTable, ignoreIfExists = false) | ||
|
||
Seq.empty[Row] | ||
} | ||
} | ||
|
@@ -232,6 +238,15 @@ case class CreateDataSourceTableAsSelectCommand( | |
sessionState.catalog.createTable(newTable, ignoreIfExists = false) | ||
} | ||
|
||
result match { | ||
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && | ||
sparkSession.sqlContext.conf.manageFilesourcePartitions => | ||
// Need to recover partitions into the metastore so our saved data is visible. | ||
sparkSession.sessionState.executePlan( | ||
AlterTableRecoverPartitionsCommand(table.identifier)).toRdd | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems we have a similar logic at https://github.com/apache/spark/pull/15515/files#diff-94fbd986b04087223f53697d4b6cab24R396. Are we recovering partitions twice? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just double checked with |
||
case _ => | ||
} | ||
|
||
// Refresh the cache of the table in the catalog. | ||
sessionState.catalog.refreshTable(tableIdentWithDB) | ||
Seq.empty[Row] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,10 +28,11 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} | |
|
||
import org.apache.spark.sql.{AnalysisException, Row, SparkSession} | ||
import org.apache.spark.sql.catalyst.TableIdentifier | ||
import org.apache.spark.sql.catalyst.analysis.Resolver | ||
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog} | ||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} | ||
import org.apache.spark.sql.execution.datasources.PartitioningUtils | ||
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils} | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.util.SerializableConfiguration | ||
|
||
|
@@ -346,10 +347,7 @@ case class AlterTableAddPartitionCommand( | |
val catalog = sparkSession.sessionState.catalog | ||
val table = catalog.getTableMetadata(tableName) | ||
DDLUtils.verifyAlterTableType(catalog, table, isView = false) | ||
if (DDLUtils.isDatasourceTable(table)) { | ||
throw new AnalysisException( | ||
"ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API") | ||
} | ||
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE ADD PARTITION") | ||
val parts = partitionSpecsAndLocs.map { case (spec, location) => | ||
val normalizedSpec = PartitioningUtils.normalizePartitionSpec( | ||
spec, | ||
|
@@ -382,11 +380,8 @@ case class AlterTableRenamePartitionCommand( | |
override def run(sparkSession: SparkSession): Seq[Row] = { | ||
val catalog = sparkSession.sessionState.catalog | ||
val table = catalog.getTableMetadata(tableName) | ||
if (DDLUtils.isDatasourceTable(table)) { | ||
throw new AnalysisException( | ||
"ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API") | ||
} | ||
DDLUtils.verifyAlterTableType(catalog, table, isView = false) | ||
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE RENAME PARTITION") | ||
|
||
val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec( | ||
oldPartition, | ||
|
@@ -432,10 +427,7 @@ case class AlterTableDropPartitionCommand( | |
val catalog = sparkSession.sessionState.catalog | ||
val table = catalog.getTableMetadata(tableName) | ||
DDLUtils.verifyAlterTableType(catalog, table, isView = false) | ||
if (DDLUtils.isDatasourceTable(table)) { | ||
throw new AnalysisException( | ||
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API") | ||
} | ||
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") | ||
|
||
val normalizedSpecs = specs.map { spec => | ||
PartitioningUtils.normalizePartitionSpec( | ||
|
@@ -493,33 +485,39 @@ case class AlterTableRecoverPartitionsCommand( | |
} | ||
} | ||
|
||
private def getBasePath(table: CatalogTable): Option[String] = { | ||
if (table.provider == Some("hive")) { | ||
table.storage.locationUri | ||
} else { | ||
new CaseInsensitiveMap(table.storage.properties).get("path") | ||
} | ||
} | ||
|
||
override def run(spark: SparkSession): Seq[Row] = { | ||
val catalog = spark.sessionState.catalog | ||
val table = catalog.getTableMetadata(tableName) | ||
val tableIdentWithDB = table.identifier.quotedString | ||
DDLUtils.verifyAlterTableType(catalog, table, isView = false) | ||
if (DDLUtils.isDatasourceTable(table)) { | ||
throw new AnalysisException( | ||
s"Operation not allowed: $cmd on datasource tables: $tableIdentWithDB") | ||
} | ||
if (table.partitionColumnNames.isEmpty) { | ||
throw new AnalysisException( | ||
s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB") | ||
} | ||
if (table.storage.locationUri.isEmpty) { | ||
|
||
val tablePath = getBasePath(table) | ||
if (tablePath.isEmpty) { | ||
throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " + | ||
s"location provided: $tableIdentWithDB") | ||
} | ||
|
||
val root = new Path(table.storage.locationUri.get) | ||
val root = new Path(tablePath.get) | ||
logInfo(s"Recover all the partitions in $root") | ||
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) | ||
|
||
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt | ||
val hadoopConf = spark.sparkContext.hadoopConfiguration | ||
val pathFilter = getPathFilter(hadoopConf) | ||
val partitionSpecsAndLocs = scanPartitions( | ||
spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold) | ||
val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(), | ||
table.partitionColumnNames, threshold, spark.sessionState.conf.resolver) | ||
val total = partitionSpecsAndLocs.length | ||
logInfo(s"Found $total partitions in $root") | ||
|
||
|
@@ -531,6 +529,11 @@ case class AlterTableRecoverPartitionsCommand( | |
logInfo(s"Finished to gather the fast stats for all $total partitions.") | ||
|
||
addPartitions(spark, table, partitionSpecsAndLocs, partitionStats) | ||
// Updates the table to indicate that its partition metadata is stored in the Hive metastore. | ||
// This is always the case for Hive format tables, but is not true for Datasource tables created | ||
// before Spark 2.1 unless they are converted via `msck repair table`. | ||
spark.sessionState.catalog.alterTable(table.copy(partitionProviderIsHive = true)) | ||
catalog.refreshTable(tableName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's also update the doc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this is a pretty major change for 2.1. Shall we do it in a followup once the patches for 2.1 are finalized? |
||
logInfo(s"Recovered all partitions ($total).") | ||
Seq.empty[Row] | ||
} | ||
|
@@ -544,7 +547,8 @@ case class AlterTableRecoverPartitionsCommand( | |
path: Path, | ||
spec: TablePartitionSpec, | ||
partitionNames: Seq[String], | ||
threshold: Int): GenSeq[(TablePartitionSpec, Path)] = { | ||
threshold: Int, | ||
resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = { | ||
if (partitionNames.isEmpty) { | ||
return Seq(spec -> path) | ||
} | ||
|
@@ -563,15 +567,15 @@ case class AlterTableRecoverPartitionsCommand( | |
val name = st.getPath.getName | ||
if (st.isDirectory && name.contains("=")) { | ||
val ps = name.split("=", 2) | ||
val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase | ||
val columnName = PartitioningUtils.unescapePathName(ps(0)) | ||
// TODO: Validate the value | ||
val value = PartitioningUtils.unescapePathName(ps(1)) | ||
// comparing with case-insensitive, but preserve the case | ||
if (columnName == partitionNames.head) { | ||
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), | ||
partitionNames.drop(1), threshold) | ||
if (resolver(columnName, partitionNames.head)) { | ||
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value), | ||
partitionNames.drop(1), threshold, resolver) | ||
} else { | ||
logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it") | ||
logWarning( | ||
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it") | ||
Seq() | ||
} | ||
} else { | ||
|
@@ -676,16 +680,11 @@ case class AlterTableSetLocationCommand( | |
DDLUtils.verifyAlterTableType(catalog, table, isView = false) | ||
partitionSpec match { | ||
case Some(spec) => | ||
DDLUtils.verifyPartitionProviderIsHive( | ||
sparkSession, table, "ALTER TABLE ... SET LOCATION") | ||
// Partition spec is specified, so we set the location only for this partition | ||
val part = catalog.getPartition(table.identifier, spec) | ||
val newPart = | ||
if (DDLUtils.isDatasourceTable(table)) { | ||
throw new AnalysisException( | ||
"ALTER TABLE SET LOCATION for partition is not allowed for tables defined " + | ||
"using the datasource API") | ||
} else { | ||
part.copy(storage = part.storage.copy(locationUri = Some(location))) | ||
} | ||
val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location))) | ||
catalog.alterPartitions(table.identifier, Seq(newPart)) | ||
case None => | ||
// No partition spec is specified, so we set the location for the table itself | ||
|
@@ -709,6 +708,25 @@ object DDLUtils { | |
table.provider.isDefined && table.provider.get != "hive" | ||
} | ||
|
||
/** | ||
* Throws a standard error for actions that require partitionProvider = hive. | ||
*/ | ||
def verifyPartitionProviderIsHive( | ||
spark: SparkSession, table: CatalogTable, action: String): Unit = { | ||
val tableName = table.identifier.table | ||
if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) { | ||
throw new AnalysisException( | ||
s"$action is not allowed on $tableName since filesource partition management is " + | ||
"disabled (spark.sql.hive.manageFilesourcePartitions = false).") | ||
} | ||
if (!table.partitionProviderIsHive && isDatasourceTable(table)) { | ||
throw new AnalysisException( | ||
s"$action is not allowed on $tableName since its partition metadata is not stored in " + | ||
"the Hive metastore. To import this information into the metastore, run " + | ||
s"`msck repair table $tableName`") | ||
} | ||
} | ||
|
||
/** | ||
* If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view, | ||
* issue an exception [[AnalysisException]]. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we return createCmd (also check recoverPartitionCmd)? Putting them together looks weird.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is ok, otherwise you'd need a lot of special logic just for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a special node for running a sequence of commands. We are relying on the implementation of Union at here. Let's address this in a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I'll file a ticket after this is merged