[SPARK-1570] Fix classloading in JavaSQLContext.applySchema#484
Closed
kanzhang wants to merge 1 commit intoapache:masterfrom
Closed
[SPARK-1570] Fix classloading in JavaSQLContext.applySchema#484kanzhang wants to merge 1 commit intoapache:masterfrom
kanzhang wants to merge 1 commit intoapache:masterfrom
Conversation
|
Can one of the admins verify this patch? |
Contributor
Author
|
Command used to expose the bug: SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop2.3.0-deps.jar ./bin/spark-submit ./examples/target/scala-2.10/spark-examples_2.10-1.0.0-SNAPSHOT.jar --master local --deploy-mode client --class org.apache.spark.examples.sql.JavaSparkSQL |
Contributor
|
Thanks for testing this! Looks good to me |
Contributor
|
Jenkins, test this please. |
|
Merged build triggered. |
|
Merged build started. |
Contributor
|
LGTM too. Thanks for doing this! |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14335/ |
Contributor
|
This passed tests other than the MIMA issues... going to merge it... we'll have to figure those out. |
asfgit
pushed a commit
that referenced
this pull request
Apr 22, 2014
I think I hit a class loading issue when running JavaSparkSQL example using spark-submit in local mode. Author: Kan Zhang <kzhang@apache.org> Closes #484 from kanzhang/SPARK-1570 and squashes the following commits: feaaeba [Kan Zhang] [SPARK-1570] Fix classloading in JavaSQLContext.applySchema (cherry picked from commit ea8cea8) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
pwendell
added a commit
to pwendell/spark
that referenced
this pull request
May 12, 2014
Made run-example respect SPARK_JAVA_OPTS and SPARK_MEM. bin/run-example scripts was not passing Java properties set through the SPARK_JAVA_OPTS to the example. This is important for examples like Twitter** as the Twitter authentication information must be set through java properties. Hence added the same JAVA_OPTS code in run-example as it is in bin/spark-class script. Also added SPARK_MEM, in case someone wants to run the example with different amounts of memory. This can be removed if it is not tune with the intended semantics of the run-example scripts. @matei Please check this soon I want this to go in 0.9-rc4
pdeyhim
pushed a commit
to pdeyhim/spark-1
that referenced
this pull request
Jun 25, 2014
I think I hit a class loading issue when running JavaSparkSQL example using spark-submit in local mode. Author: Kan Zhang <kzhang@apache.org> Closes apache#484 from kanzhang/SPARK-1570 and squashes the following commits: feaaeba [Kan Zhang] [SPARK-1570] Fix classloading in JavaSQLContext.applySchema
andrewor14
pushed a commit
to andrewor14/spark
that referenced
this pull request
Jan 8, 2015
Made run-example respect SPARK_JAVA_OPTS and SPARK_MEM. bin/run-example scripts was not passing Java properties set through the SPARK_JAVA_OPTS to the example. This is important for examples like Twitter** as the Twitter authentication information must be set through java properties. Hence added the same JAVA_OPTS code in run-example as it is in bin/spark-class script. Also added SPARK_MEM, in case someone wants to run the example with different amounts of memory. This can be removed if it is not tune with the intended semantics of the run-example scripts. @matei Please check this soon I want this to go in 0.9-rc4 (cherry picked from commit c67d3d8) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
yifeih
pushed a commit
to yifeih/spark
that referenced
this pull request
Feb 20, 2019
###### _excavator_ is a bot for automating changes across repositories.
Changes produced by the roomba/latest-baseline-oss check.
{runtimeCheckDesc}
To enable or disable this check, please contact the maintainers of Excavator.
bzhaoopenstack
pushed a commit
to bzhaoopenstack/spark
that referenced
this pull request
Sep 11, 2019
Remove unneeded workaround for default sg of fusioncloud
cloud-fan
pushed a commit
that referenced
this pull request
Jun 21, 2021
…subquery reuse
### What changes were proposed in this pull request?
This PR:
1. Fixes an issue in `ReuseExchange` rule that can result a `ReusedExchange` node pointing to an invalid exchange. This can happen due to the 2 separate traversals in `ReuseExchange` when the 2nd traversal modifies an exchange that has already been referenced (reused) in the 1st traversal.
Consider the following query:
```
WITH t AS (
SELECT df1.id, df2.k
FROM df1 JOIN df2 ON df1.k = df2.k
WHERE df2.id < 2
)
SELECT * FROM t AS a JOIN t AS b ON a.id = b.id
```
Before this PR the plan of the query was (note the `<== this reuse node points to a non-existing node` marker):
```
== Physical Plan ==
*(7) SortMergeJoin [id#14L], [id#18L], Inner
:- *(3) Sort [id#14L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#14L, 5), true, [id=#298]
: +- *(2) Project [id#14L, k#17L]
: +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight
: :- *(2) Project [id#14L, k#15L]
: : +- *(2) Filter isnotnull(id#14L)
: : +- *(2) ColumnarToRow
: : +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
: : +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#289]
: : +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179]
: +- *(1) Project [k#17L]
: +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L))
: +- *(1) ColumnarToRow
: +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
+- *(6) Sort [id#18L ASC NULLS FIRST], false, 0
+- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#184] <== this reuse node points to a non-existing node
```
After this PR:
```
== Physical Plan ==
*(7) SortMergeJoin [id#14L], [id#18L], Inner
:- *(3) Sort [id#14L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#14L, 5), true, [id=#231]
: +- *(2) Project [id#14L, k#17L]
: +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight
: :- *(2) Project [id#14L, k#15L]
: : +- *(2) Filter isnotnull(id#14L)
: : +- *(2) ColumnarToRow
: : +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
: : +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#103]
: : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102]
: : +- *(1) Project [k#17L]
: : +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
: +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102]
+- *(6) Sort [id#18L ASC NULLS FIRST], false, 0
+- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#231]
```
2. Fixes an issue with separate consecutive `ReuseExchange` and `ReuseSubquery` rules that can result a `ReusedExchange` node pointing to an invalid exchange. This can happen due to the 2 separate rules when `ReuseSubquery` rule modifies an exchange that has already been referenced (reused) in `ReuseExchange` rule.
Consider the following query:
```
WITH t AS (
SELECT df1.id, df2.k
FROM df1 JOIN df2 ON df1.k = df2.k
WHERE df2.id < 2
),
t2 AS (
SELECT * FROM t
UNION
SELECT * FROM t
)
SELECT * FROM t2 AS a JOIN t2 AS b ON a.id = b.id
```
Before this PR the plan of the query was (note the `<== this reuse node points to a non-existing node` marker):
```
== Physical Plan ==
*(15) SortMergeJoin [id#46L], [id#58L], Inner
:- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#46L, 5), true, [id=#979]
: +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
: +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#975]
: +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
: +- Union
: :- *(2) Project [id#46L, k#49L]
: : +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
: : :- *(2) Project [id#46L, k#47L]
: : : +- *(2) Filter isnotnull(id#46L)
: : : +- *(2) ColumnarToRow
: : : +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
: : : +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926]
: : : +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
: : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
: : +- *(1) Project [k#49L]
: : +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
: +- *(4) Project [id#46L, k#49L]
: +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
: :- *(4) Project [id#46L, k#47L]
: : +- *(4) Filter isnotnull(id#46L)
: : +- *(4) ColumnarToRow
: : +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
: : +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926]
: +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
+- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
+- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#761] <== this reuse node points to a non-existing node
```
After this PR:
```
== Physical Plan ==
*(15) SortMergeJoin [id#46L], [id#58L], Inner
:- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#46L, 5), true, [id=#793]
: +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
: +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#789]
: +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
: +- Union
: :- *(2) Project [id#46L, k#49L]
: : +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
: : :- *(2) Project [id#46L, k#47L]
: : : +- *(2) Filter isnotnull(id#46L)
: : : +- *(2) ColumnarToRow
: : : +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
: : : +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485]
: : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
: : : +- *(1) Project [k#49L]
: : : +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L))
: : : +- *(1) ColumnarToRow
: : : +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
: : +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
: +- *(4) Project [id#46L, k#49L]
: +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
: :- *(4) Project [id#46L, k#47L]
: : +- *(4) Filter isnotnull(id#46L)
: : +- *(4) ColumnarToRow
: : +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
: : +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485]
: +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
+- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
+- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#793]
```
(This example contains issue 1 as well.)
3. Improves the reuse of exchanges and subqueries by enabling reuse across the whole plan. This means that the new combined rule utilizes the reuse opportunities between parent and subqueries by traversing the whole plan. The traversal is started on the top level query only.
4. Due to the order of traversal this PR does while adding reuse nodes, the reuse nodes appear in parent queries if reuse is possible between different levels of queries (typical for DPP). This is not an issue from execution perspective, but this also means "forward references" in explain formatted output where parent queries come first. The changes I made to `ExplainUtils` are to handle these references properly.
This PR fixes the above 3 issues by unifying the separate rules into a `ReuseExchangeAndSubquery` rule that does a 1 pass, whole-plan, bottom-up traversal.
### Why are the changes needed?
Performance improvement.
### How was this patch tested?
- New UTs in `ReuseExchangeAndSubquerySuite` to cover 1. and 2.
- New UTs in `DynamicPartitionPruningSuite`, `SubquerySuite` and `ExchangeSuite` to cover 3.
- New `ReuseMapSuite` to test `ReuseMap`.
- Checked new golden files of `PlanStabilitySuite`s for invalid reuse references.
- TPCDS benchmarks.
Closes #28885 from peter-toth/SPARK-29375-SPARK-28940-whole-plan-reuse.
Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
I think I hit a class loading issue when running JavaSparkSQL example using spark-submit in local mode.