-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-16596] [SQL] Refactor DataSourceScanExec to do partition discovery at execution instead of planning time #14241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This doesn't actually give us a way to add additional filter constraints in the physical operator, does it? |
Test build #62438 has finished for PR 14241 at commit
|
You should be able to add additional filter constraints in buildScan(), e.g. in FileDataSourceStrategy. I don't think it matters too much whether that code is located within buildScan(), or in the operator itself. |
// Metadata keys | ||
val INPUT_PATHS = "InputPaths" | ||
val PUSHED_FILTERS = "PushedFilters" | ||
private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these functions below were moved verbatim.
Test build #62619 has finished for PR 14241 at commit
|
Test build #62623 has finished for PR 14241 at commit
|
Test build #62624 has finished for PR 14241 at commit
|
Test build #62625 has finished for PR 14241 at commit
|
Test build #62627 has finished for PR 14241 at commit
|
Test build #62691 has finished for PR 14241 at commit
|
override val outputPartitioning: Partitioning, | ||
override val metadata: Map[String, String], | ||
outputSchema: StructType, | ||
partitionFilters: Seq[Expression], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add classdoc documenting what partitionFilters and dataFilters do? It's a little bit confusing because they are both filters, but have different types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW in order to make this more dynamic, we'd need to make these mutable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Test build #62845 has finished for PR 14241 at commit
|
Test build #62847 has finished for PR 14241 at commit
|
@@ -358,11 +358,11 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet | |||
df1.write.parquet(tableDir.getAbsolutePath) | |||
|
|||
val agged = spark.table("bucketed_table").groupBy("i").count() | |||
val error = intercept[RuntimeException] { | |||
val error = intercept[Exception] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: we cannot catch the proper exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a nested exception, which is quite hard to match. The following assert checks for the right error message, which is the important bit I think.
This looks pretty good. I have left a few comments. |
/** Physical plan node for scanning data from a batched relation. */ | ||
private[sql] case class BatchedDataSourceScanExec( | ||
/** | ||
* Physical plan node for scanning data from files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...from HadoopFsRelations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Test build #62979 has finished for PR 14241 at commit
|
@@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec( | |||
|} | |||
""".stripMargin | |||
} | |||
|
|||
// Ignore rdd when checking results | |||
override def sameResult(plan: SparkPlan): Boolean = plan match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make sure we fix this one
LGTM |
Test build #63139 has finished for PR 14241 at commit
|
Test build #63141 has finished for PR 14241 at commit
|
@hvanhovell Have you finished your round of review? |
LGTM |
Merging this into master, thanks! |
What changes were proposed in this pull request?
Partition discovery is rather expensive, so we should do it at execution time instead of during physical planning. Right now there is not much benefit since ListingFileCatalog will read scan for all partitions at planning time anyways, but this can be optimized in the future. Also, there might be more information for partition pruning not available at planning time.
This PR moves a lot of the file scan logic from planning to execution time. All file scan operations are handled by
FileSourceScanExec
, which handles both batched and non-batched file scans. This requires some duplication withRowDataSourceScanExec
, but is probably worth it so thatFileSourceScanExec
does not need to depend on an input RDD.TODO: In another pr, move DataSourceScanExec to it's own file.
How was this patch tested?
Existing tests (it might be worth adding a test that catalog.listFiles() is delayed until execution, but this can be delayed until there is an actual benefit to doing so).