Skip to content

[SPARK-23203][SQL] make DataSourceV2Relation immutable #20448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Jan 31, 2018

What changes were proposed in this pull request?

This is inspired by #20387, but only focus on making the plan immutable.

The idea is simple, instead of keeping the mutable DataSourceReader in the plan, we should keep DataSourceV2, and create the reader when needed. The pushdown information will be stored in the plan, instead of relying on the mutable reader.

This can also help us removing 2 unnecessary APIs from SupportsPushDownCatalystFilters and SupportsPushDownFilters.

Since in this PR we add a lot of new parameters to DataSourceRelation, the explain result of this plan becomes a little messy, I cleaned it up a little, now the explain looks like

== Parsed Logical Plan ==
Relation SimpleDataSourceV2[i#0, j#1]

== Analyzed Logical Plan ==
i: int, j: int
Relation SimpleDataSourceV2[i#0, j#1]

== Optimized Logical Plan ==
Relation SimpleDataSourceV2[i#0, j#1]

== Physical Plan ==
*(1) Scan SimpleDataSourceV2[i#0, j#1]

== Parsed Logical Plan ==
'Filter ('i > 6)
+- AnalysisBarrier
      +- Project [j#78]
         +- Relation JavaAdvancedDataSourceV2[i#77, j#78] ()

== Analyzed Logical Plan ==
j: int
Project [j#78]
+- Filter (i#77 > 6)
   +- Project [j#78, i#77]
      +- Relation JavaAdvancedDataSourceV2[i#77, j#78] ()

== Optimized Logical Plan ==
Relation JavaAdvancedDataSourceV2[j#78] (PushedFilter: [isnotnull(i#77), (i#77 > 6)])

== Physical Plan ==
*(1) Scan JavaAdvancedDataSourceV2[j#78] (PushedFilter: [isnotnull(i#77), (i#77 > 6)])

How was this patch tested?

I improved the test in DataSourceV2Suite, to make sure this new change doesn't break the column pruning and filter push down.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Jan 31, 2018

cc @rdblue @tdas @gatorsmile @ericl

case _ => false
}

override def hashCode(): Int = {
metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
}

lazy val output: Seq[Attribute] = reader.readSchema().map(_.name).map { name =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to do this anymore. Now the plan is immutable, we have to create a new plan when applying push down optimizations, and we can also update output at that time.

@SparkQA
Copy link

SparkQA commented Jan 31, 2018

Test build #86861 has finished for PR 20448 at commit 7441334.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 31, 2018

Test build #86862 has finished for PR 20448 at commit 0665282.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.


/**
* A base class for data source reader holder with customized equals/hashCode methods.
* A base class for data source v2 related query plan. It defines the equals/hashCode methods
* according to some common information.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to emphasize this is for both physical and logical plans.

* @param options The options specified for this scan, used to create the `DataSourceReader`.
* @param userSpecifiedSchema The user specified schema, used to create the `DataSourceReader`.
* @param filters The predicates which are pushed and handled by this data source.
* @param existingReader An mutable reader carrying some temporary stats during optimization and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An -> A

options: DataSourceOptions,
userSpecifiedSchema: Option[StructType],
filters: Set[Expression],
existingReader: Option[DataSourceReader]) extends LeafNode with DataSourceV2QueryPlan {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this plan does not extend MultiInstanceRelation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test for self join? Just to ensure it still works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! Yea this is a bug, but to respect the rule about solving different issues in different PR, I'd like to fix it in a new PR.

@SparkQA
Copy link

SparkQA commented Jan 31, 2018

Test build #86863 has finished for PR 20448 at commit d96a48f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

case _ => throw new IllegalStateException()
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to override a def doCanonicalize?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the output of this node in Explain?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the behavior we expect when users call REFRESH TABLE?

Also another potential issue is about storing the statistics in the external catalog? Do we still have the previous issues discussed in #14712?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data source v2 doesn't support tables yet, so we don't have this problem now.

@SparkQA
Copy link

SparkQA commented Jan 31, 2018

Test build #86865 has finished for PR 20448 at commit 11220db.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 31, 2018

Test build #86867 has finished for PR 20448 at commit 11220db.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

class StreamingDataSourceV2Relation(
fullOutput: Seq[AttributeReference],
reader: DataSourceReader) extends DataSourceV2Relation(fullOutput, reader) {
case class StreamingDataSourceV2Relation(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to LogicalRelation, I think we can simply add a isStream parameter to DataSourceV2Relation. This can be addressed in a follow up PR.

@SparkQA
Copy link

SparkQA commented Jan 31, 2018

Test build #86876 has finished for PR 20448 at commit 11220db.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented Jan 31, 2018

@cloud-fan, please close this PR. There is already a pull request for these changes, #20387, and ongoing discussion there.

If you want the proposed implementation to change, please ask for changes in a review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants