-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-16406][SQL] Improve performance of LogicalPlan.resolve #14083
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Wow! And, thank you for pinging me. :) |
Test build #61882 has finished for PR 14083 at commit
|
Oh, it's strange. It still fails at compiling in my local laptop. |
Test build #61884 has finished for PR 14083 at commit
|
messed up the merge - lemme fix that |
Test build #61886 has finished for PR 14083 at commit
|
The codes in the description seems incomplete? |
@viirya you mean I forgot to add |
yea. :-) |
} else { | ||
None | ||
/** Map to use for direct case insensitive attribute lookups. */ | ||
private val direct: Map[String, Seq[Attribute]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we need to use lazy val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have a point: it is the secondary code path, so it is less likely to be used. I'll take a look at it on my next pass.
I've got another idea (orthogonal to this). Thank you, @hvanhovell . |
|
||
/** | ||
* Helper class for (LogicalPlan) attribute resolution. This class indexes attributes by their | ||
* case-in-sensitive name, and checks potential candidates using the given Resolver. Both qualified |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case-insensitive? When you say "the given Resolver", what do you mean by "Resolver"? Can we link to the type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resolve
methods takes a Resolver
as its parameter. This allows us to use either case sensitive or insensitive attribute resolution depending on the Resolver
passed. The names of both classes are confusing and I might rename the AttributeResolver
class to AttributeIndex
or something like that...
The AttributeResolver
creates two indexes based on the lower case (qualified) attribute name; we do an initial lookup based on the lower case name, and then use the Resolver
for the actual attribute selection. This allows us to do fast(er) and correct lookups.
Test build #61923 has finished for PR 14083 at commit
|
Test build #61952 has finished for PR 14083 at commit
|
Finally! Congrat! |
Just came across the old PR. Because this is similar in spirit to #13505, I wonder whether it might make sense to put some of this logic into |
@JoshRosen I have moved the implementation into |
Test build #65231 has finished for PR 14083 at commit
|
cc @hvanhovell |
Close it? @hvanhovell |
@cloud-fan can you take a look? |
Test build #89636 has finished for PR 14083 at commit
|
Test build #89637 has finished for PR 14083 at commit
|
// Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final | ||
// expression as "c". | ||
val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) => | ||
ExtractValue(e, Literal(name), resolver) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExtractValue
has the same perf problem, but this can be fixed in a follow up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an issue for the follow up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heuermh I have not filed the issue for this. Do you want to work on this?
|
||
case ambiguousReferences => | ||
// More than one match. | ||
val referenceNames = ambiguousReferences.mkString(", ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to pass the test, we should follow the previous code: ambiguousReferences.map(_._1.qualifiedName).mkString(", ")
LGTM |
Test build #90250 has finished for PR 14083 at commit
|
Test build #90252 has finished for PR 14083 at commit
|
@hvanhovell . Could you update the PR description benchmark result, too? |
@dongjoon-hyun done. Still a 4x improvement. |
Merging to master. Thanks for all the reviews! |
Thank you for starting and completing this! :D |
`LogicalPlan.resolve(...)` uses linear searches to find an attribute matching a name. This is fine in normal cases, but gets problematic when you try to resolve a large number of columns on a plan with a large number of attributes. This PR adds an indexing structure to `resolve(...)` in order to find potential matches quicker. This PR improves the reference resolution time for the following code by 4x (11.8s -> 2.4s): ``` scala val n = 4000 val values = (1 to n).map(_.toString).mkString(", ") val columns = (1 to n).map("column" + _).mkString(", ") val query = s""" |SELECT $columns |FROM VALUES ($values) T($columns) |WHERE 1=2 AND 1 IN ($columns) |GROUP BY $columns |ORDER BY $columns |""".stripMargin spark.time(sql(query)) ``` Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes apache#14083 from hvanhovell/SPARK-16406.
* [SPARK-16406][SQL] Improve performance of LogicalPlan.resolve `LogicalPlan.resolve(...)` uses linear searches to find an attribute matching a name. This is fine in normal cases, but gets problematic when you try to resolve a large number of columns on a plan with a large number of attributes. This PR adds an indexing structure to `resolve(...)` in order to find potential matches quicker. This PR improves the reference resolution time for the following code by 4x (11.8s -> 2.4s): ``` scala val n = 4000 val values = (1 to n).map(_.toString).mkString(", ") val columns = (1 to n).map("column" + _).mkString(", ") val query = s""" |SELECT $columns |FROM VALUES ($values) T($columns) |WHERE 1=2 AND 1 IN ($columns) |GROUP BY $columns |ORDER BY $columns |""".stripMargin spark.time(sql(query)) ``` Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes apache#14083 from hvanhovell/SPARK-16406. * Keep old-style messages for AnalysisException with ambiguous references
* [SPARK-23816][CORE] Killed tasks should ignore FetchFailures. SPARK-19276 ensured that FetchFailures do not get swallowed by other layers of exception handling, but it also meant that a killed task could look like a fetch failure. This is particularly a problem with speculative execution, where we expect to kill tasks as they are reading shuffle data. The fix is to ensure that we always check for killed tasks first. Added a new unit test which fails before the fix, ran it 1k times to check for flakiness. Full suite of tests on jenkins. Author: Imran Rashid <irashid@cloudera.com> Closes apache#20987 from squito/SPARK-23816. (cherry picked from commit 10f45bb) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> * [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen. `EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result by codegen. ```scala scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF() df: org.apache.spark.sql.DataFrame = [_1: double, _2: double] scala> df.show() +----+----+ | _1| _2| +----+----+ |-1.0|null| |null|-1.0| +----+----+ scala> df.filter("_1 <=> _2").show() +----+----+ | _1| _2| +----+----+ |-1.0|null| |null|-1.0| +----+----+ ``` The result should be empty but the result remains two rows. Added a test. Author: Takuya UESHIN <ueshin@databricks.com> Closes apache#21094 from ueshin/issues/SPARK-24007/equalnullsafe. (cherry picked from commit f09a9e9) Signed-off-by: gatorsmile <gatorsmile@gmail.com> * [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table ## What changes were proposed in this pull request? TableReader would get disproportionately slower as the number of columns in the query increased. I fixed the way TableReader was looking up metadata for each column in the row. Previously, it had been looking up this data in linked lists, accessing each linked list by an index (column number). Now it looks up this data in arrays, where indexing by column number works better. ## How was this patch tested? Manual testing All sbt unit tests python sql tests Author: Bruce Robbins <bersprockets@gmail.com> Closes apache#21043 from bersprockets/tabreadfix. * [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId ## What changes were proposed in this pull request? Fix comment. Change `BroadcastHashJoin.broadcastFuture` to `BroadcastExchangeExec.relationFuture`: https://github.com/apache/spark/blob/d28d5732ae205771f1f443b15b10e64dcffb5ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L66 ## How was this patch tested? N/A Author: seancxmao <seancxmao@gmail.com> Closes apache#21113 from seancxmao/SPARK-13136. (cherry picked from commit c303b1b) Signed-off-by: hyukjinkwon <gurwls223@apache.org> * [SPARK-23941][MESOS] Mesos task failed on specific spark app name ## What changes were proposed in this pull request? Shell escaped the name passed to spark-submit and change how conf attributes are shell escaped. ## How was this patch tested? This test has been tested manually with Hive-on-spark with mesos or with the use case described in the issue with the sparkPi application with a custom name which contains illegal shell characters. With this PR, hive-on-spark on mesos works like a charm with hive 3.0.0-SNAPSHOT. I state that this contribution is my original work and that I license the work to the project under the project’s open source license Author: Bounkong Khamphousone <bounkong.khamphousone@ebiznext.com> Closes apache#21014 from tiboun/fix/SPARK-23941. (cherry picked from commit 6782359) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> * [SPARK-23433][CORE] Late zombie task completions update all tasksets Fetch failure lead to multiple tasksets which are active for a given stage. While there is only one "active" version of the taskset, the earlier attempts can still have running tasks, which can complete successfully. So a task completion needs to update every taskset so that it knows the partition is completed. That way the final active taskset does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. Author: Imran Rashid <irashid@cloudera.com> Closes apache#21131 from squito/SPARK-23433. (cherry picked from commit 94641fe) Signed-off-by: Imran Rashid <irashid@cloudera.com> * [SPARK-23489][SQL][TEST][BRANCH-2.2] HiveExternalCatalogVersionsSuite should verify the downloaded file ## What changes were proposed in this pull request? This is a backport of apache#21210 because `branch-2.2` also faces the same failures. Although [SPARK-22654](https://issues.apache.org/jira/browse/SPARK-22654) made `HiveExternalCatalogVersionsSuite` download from Apache mirrors three times, it has been flaky because it didn't verify the downloaded file. Some Apache mirrors terminate the downloading abnormally, the *corrupted* file shows the following errors. ``` gzip: stdin: not in gzip format tar: Child returned status 1 tar: Error is not recoverable: exiting now 22:46:32.700 WARN org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite: ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.hive.HiveExternalCatalogVersionsSuite, thread names: Keep-Alive-Timer ===== *** RUN ABORTED *** java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "/tmp/test-spark/spark-2.2.0"): error=2, No such file or directory ``` This has been reported weirdly in two ways. For example, the above case is reported as Case 2 `no failures`. - Case 1. [Test Result (1 failure / +1)](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/4389/) - Case 2. [Test Result (no failures)](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.6/4811/) This PR aims to make `HiveExternalCatalogVersionsSuite` more robust by verifying the downloaded `tgz` file by extracting and checking the existence of `bin/spark-submit`. If it turns out that the file is empty or corrupted, `HiveExternalCatalogVersionsSuite` will do retry logic like the download failure. ## How was this patch tested? Pass the Jenkins. Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#21232 from dongjoon-hyun/SPARK-23489-2. * [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly ## What changes were proposed in this pull request? It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if `AccumulableParam` doesn't define equals/hashCode. This PR fixes this by using reference equality check in `LegacyAccumulatorWrapper.isZero`. ## How was this patch tested? a new test Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21229 from cloud-fan/accumulator. (cherry picked from commit 4d5de4d) Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-21278][PYSPARK] Upgrade to Py4J 0.10.6 This PR aims to bump Py4J in order to fix the following float/double bug. Py4J 0.10.5 fixes this (py4j/py4j#272) and the latest Py4J is 0.10.6. **BEFORE** ``` >>> df = spark.range(1) >>> df.select(df['id'] + 17.133574204226083).show() +--------------------+ |(id + 17.1335742042)| +--------------------+ | 17.1335742042| +--------------------+ ``` **AFTER** ``` >>> df = spark.range(1) >>> df.select(df['id'] + 17.133574204226083).show() +-------------------------+ |(id + 17.133574204226083)| +-------------------------+ | 17.133574204226083| +-------------------------+ ``` Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#18546 from dongjoon-hyun/SPARK-21278. (cherry picked from commit c8d0aba) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> * [SPARK-16406][SQL] Improve performance of LogicalPlan.resolve `LogicalPlan.resolve(...)` uses linear searches to find an attribute matching a name. This is fine in normal cases, but gets problematic when you try to resolve a large number of columns on a plan with a large number of attributes. This PR adds an indexing structure to `resolve(...)` in order to find potential matches quicker. This PR improves the reference resolution time for the following code by 4x (11.8s -> 2.4s): ``` scala val n = 4000 val values = (1 to n).map(_.toString).mkString(", ") val columns = (1 to n).map("column" + _).mkString(", ") val query = s""" |SELECT $columns |FROM VALUES ($values) T($columns) |WHERE 1=2 AND 1 IN ($columns) |GROUP BY $columns |ORDER BY $columns |""".stripMargin spark.time(sql(query)) ``` Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes apache#14083 from hvanhovell/SPARK-16406. * [PYSPARK] Update py4j to version 0.10.7. (cherry picked from commit cc613b5) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 323dc3a) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> * [SPARKR] Match pyspark features in SparkR communication protocol. (cherry picked from commit 628c7b5) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 16cd9ac) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> * Keep old-style messages for AnalysisException with ambiguous references
## What changes were proposed in this pull request? `LogicalPlan.resolve(...)` uses linear searches to find an attribute matching a name. This is fine in normal cases, but gets problematic when you try to resolve a large number of columns on a plan with a large number of attributes. This PR adds an indexing structure to `resolve(...)` in order to find potential matches quicker. This PR improves the reference resolution time for the following code by 4x (11.8s -> 2.4s): ``` scala val n = 4000 val values = (1 to n).map(_.toString).mkString(", ") val columns = (1 to n).map("column" + _).mkString(", ") val query = s""" |SELECT $columns |FROM VALUES ($values) T($columns) |WHERE 1=2 AND 1 IN ($columns) |GROUP BY $columns |ORDER BY $columns |""".stripMargin spark.time(sql(query)) ``` ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes apache#14083 from hvanhovell/SPARK-16406.
What changes were proposed in this pull request?
LogicalPlan.resolve(...)
uses linear searches to find an attribute matching a name. This is fine in normal cases, but gets problematic when you try to resolve a large number of columns on a plan with a large number of attributes.This PR adds an indexing structure to
resolve(...)
in order to find potential matches quicker. This PR improves the reference resolution time for the following code by 4x (11.8s -> 2.4s):How was this patch tested?
Existing tests.