Skip to content

[SPARK-16596] [SQL] Refactor DataSourceScanExec to do partition discovery at execution instead of planning time #14241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 23 commits into from

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Jul 17, 2016

What changes were proposed in this pull request?

Partition discovery is rather expensive, so we should do it at execution time instead of during physical planning. Right now there is not much benefit since ListingFileCatalog will read scan for all partitions at planning time anyways, but this can be optimized in the future. Also, there might be more information for partition pruning not available at planning time.

This PR moves a lot of the file scan logic from planning to execution time. All file scan operations are handled by FileSourceScanExec, which handles both batched and non-batched file scans. This requires some duplication with RowDataSourceScanExec, but is probably worth it so that FileSourceScanExec does not need to depend on an input RDD.

TODO: In another pr, move DataSourceScanExec to it's own file.

How was this patch tested?

Existing tests (it might be worth adding a test that catalog.listFiles() is delayed until execution, but this can be delayed until there is an actual benefit to doing so).

@rxin
Copy link
Contributor

rxin commented Jul 17, 2016

This doesn't actually give us a way to add additional filter constraints in the physical operator, does it?

@SparkQA
Copy link

SparkQA commented Jul 17, 2016

Test build #62438 has finished for PR 14241 at commit 0d4642a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ericl
Copy link
Contributor Author

ericl commented Jul 18, 2016

You should be able to add additional filter constraints in buildScan(), e.g. in FileDataSourceStrategy. I don't think it matters too much whether that code is located within buildScan(), or in the operator itself.

@rxin
Copy link
Contributor

rxin commented Jul 19, 2016

@ericl I was talking with @marmbrus -- it'd be better to create an API in the physical scan operator that accepts a list of filters, and then do pruning there. That is to say, we also want to move all the pruning code from physical planning into the physical operators.

// Metadata keys
val INPUT_PATHS = "InputPaths"
val PUSHED_FILTERS = "PushedFilters"
private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these functions below were moved verbatim.

@SparkQA
Copy link

SparkQA commented Jul 20, 2016

Test build #62619 has finished for PR 14241 at commit b45e253.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 20, 2016

Test build #62623 has finished for PR 14241 at commit ebf2102.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 20, 2016

Test build #62624 has finished for PR 14241 at commit bbf89a1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 20, 2016

Test build #62625 has finished for PR 14241 at commit 358eb9f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 20, 2016

Test build #62627 has finished for PR 14241 at commit 2d78051.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 21, 2016

Test build #62691 has finished for PR 14241 at commit a3d2c69.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

override val outputPartitioning: Partitioning,
override val metadata: Map[String, String],
outputSchema: StructType,
partitionFilters: Seq[Expression],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add classdoc documenting what partitionFilters and dataFilters do? It's a little bit confusing because they are both filters, but have different types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW in order to make this more dynamic, we'd need to make these mutable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@SparkQA
Copy link

SparkQA commented Jul 25, 2016

Test build #62845 has finished for PR 14241 at commit 780fec5.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 26, 2016

Test build #62847 has finished for PR 14241 at commit ddb202e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -358,11 +358,11 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
df1.write.parquet(tableDir.getAbsolutePath)

val agged = spark.table("bucketed_table").groupBy("i").count()
val error = intercept[RuntimeException] {
val error = intercept[Exception] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: we cannot catch the proper exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a nested exception, which is quite hard to match. The following assert checks for the right error message, which is the important bit I think.

@hvanhovell
Copy link
Contributor

This looks pretty good. I have left a few comments.

/** Physical plan node for scanning data from a batched relation. */
private[sql] case class BatchedDataSourceScanExec(
/**
* Physical plan node for scanning data from files.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...from HadoopFsRelations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@SparkQA
Copy link

SparkQA commented Jul 28, 2016

Test build #62979 has finished for PR 14241 at commit 18f5543.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -275,62 +272,161 @@ private[sql] case class RowDataSourceScanExec(
|}
""".stripMargin
}

// Ignore rdd when checking results
override def sameResult(plan: SparkPlan): Boolean = plan match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make sure we fix this one

@davies
Copy link
Contributor

davies commented Aug 2, 2016

LGTM

@SparkQA
Copy link

SparkQA commented Aug 2, 2016

Test build #63139 has finished for PR 14241 at commit a76b432.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable
    • case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic
    • case class SparkPartitionID() extends LeafExpression with Nondeterministic
    • case class AggregateExpression(
    • case class Least(children: Seq[Expression]) extends Expression
    • case class Greatest(children: Seq[Expression]) extends Expression
    • case class CurrentDatabase() extends LeafExpression with Unevaluable
    • class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow
    • class AbstractScalaRowIterator[T] extends Iterator[T]
    • implicit class SchemaAttribute(f: StructField)

@SparkQA
Copy link

SparkQA commented Aug 3, 2016

Test build #63141 has finished for PR 14241 at commit 704511e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Aug 3, 2016

@hvanhovell Have you finished your round of review?

@hvanhovell
Copy link
Contributor

LGTM

@davies
Copy link
Contributor

davies commented Aug 3, 2016

Merging this into master, thanks!

@asfgit asfgit closed this in e6f226c Aug 3, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants