Skip to content

[SPARK-18661] [SQL] Creating a partitioned datasource table should not scan all files for table #16090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,21 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val pathOption = table.storage.locationUri.map("path" -> _)
// Fill in some default table options from the session conf
val tableWithDefaultOptions = table.copy(
identifier = table.identifier.copy(
database = Some(
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))),
tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically we don't know the value of tracksPartitionsInCatalog here, as the partition columns are not inferred yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is true for all new tables. If the table is unpartitioned the flag is harmless.

val dataSource: BaseRelation =
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
partitionColumns = table.partitionColumnNames,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks to me that this line and https://github.com/apache/spark/pull/16090/files#diff-7a6cb188d2ae31eb3347b5629a679cecR135 are the key change. Did I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to pass catalogTable in, so that on line 390 of DataSource we create a CatalogFileIndex instead of an InMemoryFileIndex.

Copy link
Contributor

@cloud-fan cloud-fan Dec 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to create an InMemoryFileIndex for this case, as we call DataSource.resolveRelation here just to infer the schema and partition columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't want to do that though. Resolve relation also does not always scan the filesystem if you pass in a user defined schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, if passing the catalogTable or not doesn't affect the correctness(or performance), we can remove the tableWithDefaultOptions and make the code simpler right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do need to pass it in though.

        val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
            catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
          new CatalogFileIndex(
            sparkSession,
            catalogTable.get,
            catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
        } else {
          new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
        }

Otherwise, this code will perform a full filesystem scan, independent of the other change to prevent getOrInferFileFormatSchema from performing a scan as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then can we just make the InMemoryFileIndex scan the file lazily? If we only need to infer the schema and partition columns, it should not do the scan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a pretty big change, considering how many classes depend on the eager behavior of InMemoryFileIndex.

className = table.provider.get,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption).resolveRelation()
options = table.storage.properties ++ pathOption,
catalogTable = Some(tableWithDefaultOptions)).resolveRelation()

dataSource match {
case fs: HadoopFsRelation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ case class DataSource(
}.toArray
new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
}
val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) {
val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource
val resolved = tempFileIndex.partitionSchema.map { partitionField =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
pathToNonPartitionedTable,
userSpecifiedSchema = Option("num int, str string"),
userSpecifiedPartitionCols = partitionCols,
expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
expectedSchema = if (partitionCols.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just change the test to use str as partition column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be testing something slightly different.

// we skipped inference, so the partition col is ordered at the end
new StructType().add("str", StringType).add("num", IntegerType)
} else {
// no inferred partitioning, so schema is in original order
new StructType().add("num", IntegerType).add("str", StringType)
},
expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
}
}
Expand Down Expand Up @@ -565,7 +571,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
assert(table.provider == Some("parquet"))
assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType))
// a is ordered last since it is a user-specified partitioning column
assert(table.schema == new StructType().add("b", IntegerType).add("a", IntegerType))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai this is the minor behavior change we discussed about create table in 2.1

assert(table.partitionColumnNames == Seq("a"))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,36 +60,52 @@ class PartitionedTablePerfStatsSuite
setupPartitionedHiveTable(tableName, dir, 5)
}

private def setupPartitionedHiveTable(tableName: String, dir: File, scale: Int): Unit = {
private def setupPartitionedHiveTable(
tableName: String, dir: File, scale: Int,
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)

if (clearMetricsBeforeCreate) {
HiveCatalogMetrics.reset()
}

spark.sql(s"""
|create external table $tableName (fieldOne long)
|partitioned by (partCol1 int, partCol2 int)
|stored as parquet
|location "${dir.getAbsolutePath}"""".stripMargin)
spark.sql(s"msck repair table $tableName")
if (repair) {
spark.sql(s"msck repair table $tableName")
}
}

private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = {
setupPartitionedDatasourceTable(tableName, dir, 5)
}

private def setupPartitionedDatasourceTable(tableName: String, dir: File, scale: Int): Unit = {
private def setupPartitionedDatasourceTable(
tableName: String, dir: File, scale: Int,
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)

if (clearMetricsBeforeCreate) {
HiveCatalogMetrics.reset()
}

spark.sql(s"""
|create table $tableName (fieldOne long, partCol1 int, partCol2 int)
|using parquet
|options (path "${dir.getAbsolutePath}")
|partitioned by (partCol1, partCol2)""".stripMargin)
spark.sql(s"msck repair table $tableName")
if (repair) {
spark.sql(s"msck repair table $tableName")
}
}

genericTest("partitioned pruned table reports only selected files") { spec =>
Expand Down Expand Up @@ -250,6 +266,33 @@ class PartitionedTablePerfStatsSuite
}
}

test("datasource table: table setup does not scan filesystem") {
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
withTable("test") {
withTempDir { dir =>
setupPartitionedDatasourceTable(
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
}
}
}

test("hive table: table setup does not scan filesystem") {
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
withTable("test") {
withTempDir { dir =>
HiveCatalogMetrics.reset()
setupPartitionedHiveTable(
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
}
}
}

test("hive table: num hive client calls does not scale with partition count") {
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
withTable("test") {
Expand Down