-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-18661] [SQL] Creating a partitioned datasource table should not scan all files for table #16090
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -58,13 +58,20 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo | |||
// Create the relation to validate the arguments before writing the metadata to the metastore, | |||
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE. | |||
val pathOption = table.storage.locationUri.map("path" -> _) | |||
val uncreatedTable = table.copy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan is there a better way to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do this before we pass the CatalogTable
? e.g. in the parser and DataFrameWriter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, that seems more brittle since you'd have to duplicate the logic. I added a comment describing why we need to do this here.
Test build #69435 has finished for PR 16090 at commit
|
Test build #69437 has finished for PR 16090 at commit
|
@@ -565,7 +571,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |||
val table = catalog.getTableMetadata(TableIdentifier("tbl")) | |||
assert(table.tableType == CatalogTableType.MANAGED) | |||
assert(table.provider == Some("parquet")) | |||
assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType)) | |||
// a is ordered last since it is a user-specified partitioning column | |||
assert(table.schema == new StructType().add("b", IntegerType).add("a", IntegerType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yhuai this is the minor behavior change we discussed about create table in 2.1
Test build #69514 has finished for PR 16090 at commit
|
@@ -58,13 +58,21 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo | |||
// Create the relation to validate the arguments before writing the metadata to the metastore, | |||
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE. | |||
val pathOption = table.storage.locationUri.map("path" -> _) | |||
// Fill in some default table options from the session conf | |||
val uncreatedTable = table.copy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about tableWithDefaultOptions
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
identifier = table.identifier.copy( | ||
database = Some( | ||
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))), | ||
tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logically we don't know the value of tracksPartitionsInCatalog
here, as the partition columns are not inferred yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is true for all new tables. If the table is unpartitioned the flag is harmless.
className = table.provider.get, | ||
bucketSpec = table.bucketSpec, | ||
options = table.storage.properties ++ pathOption).resolveRelation() | ||
options = table.storage.properties ++ pathOption, | ||
catalogTable = Some(uncreatedTable)).resolveRelation() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need to pass in the catalogTable
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise, we will construct an InMemoryFileIndex which scans the filesystem.
@@ -312,7 +312,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |||
pathToNonPartitionedTable, | |||
userSpecifiedSchema = Option("num int, str string"), | |||
userSpecifiedPartitionCols = partitionCols, | |||
expectedSchema = new StructType().add("num", IntegerType).add("str", StringType), | |||
expectedSchema = if (partitionCols.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we just change the test to use str
as partition column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would be testing something slightly different.
val dataSource: BaseRelation = | ||
DataSource( | ||
sparkSession = sparkSession, | ||
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), | ||
partitionColumns = table.partitionColumnNames, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks to me that this line and https://github.com/apache/spark/pull/16090/files#diff-7a6cb188d2ae31eb3347b5629a679cecR135 are the key change. Did I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You also need to pass catalogTable in, so that on line 390 of DataSource we create a CatalogFileIndex instead of an InMemoryFileIndex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to create an InMemoryFileIndex
for this case, as we call DataSource.resolveRelation
here just to infer the schema and partition columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't want to do that though. Resolve relation also does not always scan the filesystem if you pass in a user defined schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, if passing the catalogTable
or not doesn't affect the correctness(or performance), we can remove the tableWithDefaultOptions
and make the code simpler right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do need to pass it in though.
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
} else {
new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
}
Otherwise, this code will perform a full filesystem scan, independent of the other change to prevent getOrInferFileFormatSchema from performing a scan as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then can we just make the InMemoryFileIndex
scan the file lazily? If we only need to infer the schema and partition columns, it should not do the scan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a pretty big change, considering how many classes depend on the eager behavior of InMemoryFileIndex
.
Test build #69534 has finished for PR 16090 at commit
|
Test build #69536 has finished for PR 16090 at commit
|
My main concern is that, in It seems to me that it's more logical to tweak the |
I looked at avoiding the creation of a CatalogFileIndex, but the way table resolution works right now, the only way is to create some sort of dummy file index class that does not support scans. It's not clear to me that is any better than just creating a CatalogFileIndex, even if the table is not yet ready. We can probably clean this up so it is not necessary to create a file index for table creation, but that would be a pretty big change to land for 2.1. |
Seems like we also create InMemoryFileIndex twice for non-catalog tables. Let me try to fix that too. |
Fixed by adding a private cache to |
cc @rxin please merge unless wenchen gets to it first |
Test build #69600 has finished for PR 16090 at commit
|
Seems like the caching broke a bunch of tests. I'll take a look at this
again tomorrow.
…On Fri, Dec 2, 2016, 7:49 PM UCB AMPLab ***@***.***> wrote:
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69600/
Test FAILed.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#16090 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAA6SoVLnj_xZMTNgUpQo0THN8Z2LRvDks5rEOa2gaJpZM4LA1GX>
.
|
If we are going to hack it, how about this?
Then we don't need to create |
Not sure I follow - could you explain more on why that would resolve the issue? Btw, I reverted this pr to b405635, which passes all tests. |
Test build #69630 has finished for PR 16090 at commit
|
After looking more at the code, now I agree with your approach. One question, seems we still scan the files when creating a unpartitioned external data source table? |
Yeah I was wondering if we should also try to fix that. It seems maybe not
as bad since unpartitioned tables usually aren't that big.
We can create separate tickets for investigating that, and also the
duplicate file scan issue (that might not be a regression).
…On Sun, Dec 4, 2016, 12:05 AM Wenchen Fan ***@***.***> wrote:
After looking more at the code, now I agree with your approach. One
question, seems we still scan the files when creating a unpartitioned
external data source table?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#16090 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAA6Ska3bJklmhoUUWIaS1Tlg0ha1JsHks5rEnRigaJpZM4LA1GX>
.
|
… scan all files for table ## What changes were proposed in this pull request? Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason. We should avoid doing this when the user specifies a schema. ## How was this patch tested? Perf stat tests. Author: Eric Liang <ekl@databricks.com> Closes #16090 from ericl/spark-18661. (cherry picked from commit d9eb4c7) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
LGTM, merging to master/2.1! |
… scan all files for table ## What changes were proposed in this pull request? Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason. We should avoid doing this when the user specifies a schema. ## How was this patch tested? Perf stat tests. Author: Eric Liang <ekl@databricks.com> Closes apache#16090 from ericl/spark-18661.
…the saved files ### What changes were proposed in this pull request? `DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it. The related PR: #16090 ### How was this patch tested? Updated the existing test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes #16481 from gatorsmile/saveFileScan.
… scan all files for table ## What changes were proposed in this pull request? Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason. We should avoid doing this when the user specifies a schema. ## How was this patch tested? Perf stat tests. Author: Eric Liang <ekl@databricks.com> Closes apache#16090 from ericl/spark-18661.
…the saved files ### What changes were proposed in this pull request? `DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it. The related PR: apache#16090 ### How was this patch tested? Updated the existing test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes apache#16481 from gatorsmile/saveFileScan.
…the saved files ### What changes were proposed in this pull request? `DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it. The related PR: apache#16090 ### How was this patch tested? Updated the existing test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes apache#16481 from gatorsmile/saveFileScan.
What changes were proposed in this pull request?
Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason.
We should avoid doing this when the user specifies a schema.
How was this patch tested?
Perf stat tests.