-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-18661] [SQL] Creating a partitioned datasource table should not scan all files for table #16090
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18661] [SQL] Creating a partitioned datasource table should not scan all files for table #16090
Changes from all commits
51d2a41
275f6b9
893b130
89b0a64
2940d55
b405635
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,13 +58,21 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo | |
// Create the relation to validate the arguments before writing the metadata to the metastore, | ||
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE. | ||
val pathOption = table.storage.locationUri.map("path" -> _) | ||
// Fill in some default table options from the session conf | ||
val tableWithDefaultOptions = table.copy( | ||
identifier = table.identifier.copy( | ||
database = Some( | ||
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))), | ||
tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions) | ||
val dataSource: BaseRelation = | ||
DataSource( | ||
sparkSession = sparkSession, | ||
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), | ||
partitionColumns = table.partitionColumnNames, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it looks to me that this line and https://github.com/apache/spark/pull/16090/files#diff-7a6cb188d2ae31eb3347b5629a679cecR135 are the key change. Did I miss something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You also need to pass catalogTable in, so that on line 390 of DataSource we create a CatalogFileIndex instead of an InMemoryFileIndex. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine to create an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't want to do that though. Resolve relation also does not always scan the filesystem if you pass in a user defined schema. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean, if passing the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You do need to pass it in though.
Otherwise, this code will perform a full filesystem scan, independent of the other change to prevent getOrInferFileFormatSchema from performing a scan as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then can we just make the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a pretty big change, considering how many classes depend on the eager behavior of |
||
className = table.provider.get, | ||
bucketSpec = table.bucketSpec, | ||
options = table.storage.properties ++ pathOption).resolveRelation() | ||
options = table.storage.properties ++ pathOption, | ||
catalogTable = Some(tableWithDefaultOptions)).resolveRelation() | ||
|
||
dataSource match { | ||
case fs: HadoopFsRelation => | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -312,7 +312,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |
pathToNonPartitionedTable, | ||
userSpecifiedSchema = Option("num int, str string"), | ||
userSpecifiedPartitionCols = partitionCols, | ||
expectedSchema = new StructType().add("num", IntegerType).add("str", StringType), | ||
expectedSchema = if (partitionCols.isDefined) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we just change the test to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that would be testing something slightly different. |
||
// we skipped inference, so the partition col is ordered at the end | ||
new StructType().add("str", StringType).add("num", IntegerType) | ||
} else { | ||
// no inferred partitioning, so schema is in original order | ||
new StructType().add("num", IntegerType).add("str", StringType) | ||
}, | ||
expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String])) | ||
} | ||
} | ||
|
@@ -565,7 +571,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |
val table = catalog.getTableMetadata(TableIdentifier("tbl")) | ||
assert(table.tableType == CatalogTableType.MANAGED) | ||
assert(table.provider == Some("parquet")) | ||
assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType)) | ||
// a is ordered last since it is a user-specified partitioning column | ||
assert(table.schema == new StructType().add("b", IntegerType).add("a", IntegerType)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yhuai this is the minor behavior change we discussed about create table in 2.1 |
||
assert(table.partitionColumnNames == Seq("a")) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logically we don't know the value of
tracksPartitionsInCatalog
here, as the partition columns are not inferred yet.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is true for all new tables. If the table is unpartitioned the flag is harmless.