Skip to content

Commit 41d698e

Browse files
ericlcloud-fan
authored andcommitted
[SPARK-18661][SQL] Creating a partitioned datasource table should not scan all files for table
## What changes were proposed in this pull request? Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason. We should avoid doing this when the user specifies a schema. ## How was this patch tested? Perf stat tests. Author: Eric Liang <ekl@databricks.com> Closes #16090 from ericl/spark-18661. (cherry picked from commit d9eb4c7) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 8145c82 commit 41d698e

File tree

4 files changed

+66
-8
lines changed

4 files changed

+66
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,21 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
5858
// Create the relation to validate the arguments before writing the metadata to the metastore,
5959
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
6060
val pathOption = table.storage.locationUri.map("path" -> _)
61+
// Fill in some default table options from the session conf
62+
val tableWithDefaultOptions = table.copy(
63+
identifier = table.identifier.copy(
64+
database = Some(
65+
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))),
66+
tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions)
6167
val dataSource: BaseRelation =
6268
DataSource(
6369
sparkSession = sparkSession,
6470
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
71+
partitionColumns = table.partitionColumnNames,
6572
className = table.provider.get,
6673
bucketSpec = table.bucketSpec,
67-
options = table.storage.properties ++ pathOption).resolveRelation()
74+
options = table.storage.properties ++ pathOption,
75+
catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
6876

6977
dataSource match {
7078
case fs: HadoopFsRelation =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ case class DataSource(
132132
}.toArray
133133
new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
134134
}
135-
val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) {
135+
val partitionSchema = if (partitionColumns.isEmpty) {
136136
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
137137
// columns properly unless it is a Hive DataSource
138138
val resolved = tempFileIndex.partitionSchema.map { partitionField =>

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
312312
pathToNonPartitionedTable,
313313
userSpecifiedSchema = Option("num int, str string"),
314314
userSpecifiedPartitionCols = partitionCols,
315-
expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
315+
expectedSchema = if (partitionCols.isDefined) {
316+
// we skipped inference, so the partition col is ordered at the end
317+
new StructType().add("str", StringType).add("num", IntegerType)
318+
} else {
319+
// no inferred partitioning, so schema is in original order
320+
new StructType().add("num", IntegerType).add("str", StringType)
321+
},
316322
expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
317323
}
318324
}
@@ -565,7 +571,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
565571
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
566572
assert(table.tableType == CatalogTableType.MANAGED)
567573
assert(table.provider == Some("parquet"))
568-
assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType))
574+
// a is ordered last since it is a user-specified partitioning column
575+
assert(table.schema == new StructType().add("b", IntegerType).add("a", IntegerType))
569576
assert(table.partitionColumnNames == Seq("a"))
570577
}
571578
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,36 +60,52 @@ class PartitionedTablePerfStatsSuite
6060
setupPartitionedHiveTable(tableName, dir, 5)
6161
}
6262

63-
private def setupPartitionedHiveTable(tableName: String, dir: File, scale: Int): Unit = {
63+
private def setupPartitionedHiveTable(
64+
tableName: String, dir: File, scale: Int,
65+
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
6466
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
6567
.partitionBy("partCol1", "partCol2")
6668
.mode("overwrite")
6769
.parquet(dir.getAbsolutePath)
6870

71+
if (clearMetricsBeforeCreate) {
72+
HiveCatalogMetrics.reset()
73+
}
74+
6975
spark.sql(s"""
7076
|create external table $tableName (fieldOne long)
7177
|partitioned by (partCol1 int, partCol2 int)
7278
|stored as parquet
7379
|location "${dir.getAbsolutePath}"""".stripMargin)
74-
spark.sql(s"msck repair table $tableName")
80+
if (repair) {
81+
spark.sql(s"msck repair table $tableName")
82+
}
7583
}
7684

7785
private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = {
7886
setupPartitionedDatasourceTable(tableName, dir, 5)
7987
}
8088

81-
private def setupPartitionedDatasourceTable(tableName: String, dir: File, scale: Int): Unit = {
89+
private def setupPartitionedDatasourceTable(
90+
tableName: String, dir: File, scale: Int,
91+
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
8292
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
8393
.partitionBy("partCol1", "partCol2")
8494
.mode("overwrite")
8595
.parquet(dir.getAbsolutePath)
8696

97+
if (clearMetricsBeforeCreate) {
98+
HiveCatalogMetrics.reset()
99+
}
100+
87101
spark.sql(s"""
88102
|create table $tableName (fieldOne long, partCol1 int, partCol2 int)
89103
|using parquet
90104
|options (path "${dir.getAbsolutePath}")
91105
|partitioned by (partCol1, partCol2)""".stripMargin)
92-
spark.sql(s"msck repair table $tableName")
106+
if (repair) {
107+
spark.sql(s"msck repair table $tableName")
108+
}
93109
}
94110

95111
genericTest("partitioned pruned table reports only selected files") { spec =>
@@ -250,6 +266,33 @@ class PartitionedTablePerfStatsSuite
250266
}
251267
}
252268

269+
test("datasource table: table setup does not scan filesystem") {
270+
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
271+
withTable("test") {
272+
withTempDir { dir =>
273+
setupPartitionedDatasourceTable(
274+
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
275+
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
276+
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
277+
}
278+
}
279+
}
280+
}
281+
282+
test("hive table: table setup does not scan filesystem") {
283+
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
284+
withTable("test") {
285+
withTempDir { dir =>
286+
HiveCatalogMetrics.reset()
287+
setupPartitionedHiveTable(
288+
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
289+
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
290+
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
291+
}
292+
}
293+
}
294+
}
295+
253296
test("hive table: num hive client calls does not scale with partition count") {
254297
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
255298
withTable("test") {

0 commit comments

Comments
 (0)