Skip to content

[SPARK-15764][SQL] Replace N^2 loop in BindReferences #13505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from

Conversation

JoshRosen
Copy link
Contributor

BindReferences contains a n^2 loop which causes performance issues when operating over large schemas: to determine the ordinal of an attribute reference, we perform a linear scan over the input array. Because input can sometimes be a List, the call to input(ordinal).nullable can also be O(n).

Instead of performing a linear scan, we can convert the input into an array and build a hash map to map from expression ids to ordinals. The greater up-front cost of the map construction is offset by the fact that an expression can contain multiple attribute references, so the cost of the map construction is amortized across a number of lookups.

Perf. benchmarks to follow. /cc @ericl

@@ -84,17 +84,27 @@ object BindReferences extends Logging {
expression: A,
input: Seq[Attribute],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we can push the map construction up one level so that we can amortize its cost across multiple bindReference calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, yeah: in GenerateMutableProjection we use the same InputSchema for every expression.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add an overload which takes a sequence of expressions and binds all of their references. We should then replace the call sites in the various projection operators.

@SparkQA
Copy link

SparkQA commented Jun 4, 2016

Test build #59975 has finished for PR 13505 at commit 6216e94.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -296,7 +296,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
/**
* All the attributes that are used for this plan.
*/
lazy val allAttributes: Seq[Attribute] = children.flatMap(_.output)
lazy val allAttributes: AttributeSeq = children.flatMap(_.output)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl and I found another layer of polynomial looping: in QueryPlan.cleanArgs we take every expression in the query plan and bind its references against allAttributes, which can be huge. If we turn this into an AttributeSeq once and build the map inside of that wrapper then we amortize that cost and remove this expensive loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably construct the AttributeSeq outside of the loop in the various projection operators, too, although that doesn't appear to be as serious a bottleneck yet.

@JoshRosen
Copy link
Contributor Author

@rxin, @ericl has some new benchmarks which operate on even wider schemas and which uncovered this bottleneck. Adding the caching of the map here resulted in a huge scalability improvement. Maybe @ericl can chime in with some flame graph charts here.

@SparkQA
Copy link

SparkQA commented Jun 4, 2016

Test build #59980 has finished for PR 13505 at commit 0b412b0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • implicit class AttributeSeq(val attrs: Seq[Attribute])

@ericl
Copy link
Contributor

ericl commented Jun 4, 2016

Here's a flame graph of bindReferences dominating the CPU for a 10k column query:
original svg

indexwhere

@SparkQA
Copy link

SparkQA commented Jun 4, 2016

Test build #59976 has finished for PR 13505 at commit 38e8a99.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jun 4, 2016

hm probably shouldn't happen in this pr but i'm wondering if it'd make sense to generalize AttributeSeq and use it everywhere, rather than Seq[Attribute].

private lazy val inputArr = attrs.toArray

private lazy val inputToOrdinal = {
val map = new java.util.HashMap[ExprId, Int](inputArr.length * 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why *2 is necessary?
I think that the size of map's entry is up to attrs.size since the max number of calling map.put() is equal to `attrs.size. Isattrs.size``equal to``inputArr.legnth``?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal was to avoid having to rehash the elements of the hash map once the number of inserted keys exceeded the default 0.75 load factor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on withExpectedSize

@JoshRosen
Copy link
Contributor Author

@rxin, I think that it might make sense to use AttributeSeq more widely. Right now there's an implicit conversion so we can gradually and naively migrate APIs to accept AttributeSeq.

@@ -26,13 +26,6 @@ object AttributeMap {
def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = {
new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap)
}

/** Given a schema, constructs an [[AttributeMap]] from [[Attribute]] to ordinal */
def byIndex(schema: Seq[Attribute]): AttributeMap[Int] = apply(schema.zipWithIndex)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was vaguely-related yet unused code that I stumbled across while looking for similar occurrences of this pattern, so I decided to remove it.

@JoshRosen
Copy link
Contributor Author

Alright, updated to address comments.

@SparkQA
Copy link

SparkQA commented Jun 5, 2016

Test build #3066 has started for PR 13505 at commit 4efd3ee.

@rxin
Copy link
Contributor

rxin commented Jun 5, 2016

lgtm - I didn't look too closely though. Would be great @ericl to look at this in detail.

@SparkQA
Copy link

SparkQA commented Jun 5, 2016

Test build #59994 has finished for PR 13505 at commit 4efd3ee.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

/**
* Returns the index of first attribute with a matching expression id, or -1 if no match exists.
*/
def getOrdinalWithExprId(exprId: ExprId): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would indexOf be more clear?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this originally and then moved to this name in anticipation of a future change which would add more "get index with property" methods, but a lot of those methods aren't cachable (e.g. semanticEquals), so I'll revert this back to my first name choice.

@ericl
Copy link
Contributor

ericl commented Jun 5, 2016

Lgtm with minor comments

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Jun 5, 2016

Test build #60011 has finished for PR 13505 at commit 5504b6c.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 6, 2016

Test build #60015 has finished for PR 13505 at commit 5504b6c.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 6, 2016

Test build #60019 has finished for PR 13505 at commit 5504b6c.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 6, 2016

Test build #60029 has finished for PR 13505 at commit 5e9c258.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • implicit class AttributeSeq(val attrs: Seq[Attribute]) extends Serializable

@JoshRosen
Copy link
Contributor Author

Fixed the tests by making AttributeSeq serializable. I'm going to merge this into master and branch-2.0.

asfgit pushed a commit that referenced this pull request Jun 6, 2016
BindReferences contains a n^2 loop which causes performance issues when operating over large schemas: to determine the ordinal of an attribute reference, we perform a linear scan over the `input` array. Because input can sometimes be a `List`, the call to `input(ordinal).nullable` can also be O(n).

Instead of performing a linear scan, we can convert the input into an array and build a hash map to map from expression ids to ordinals. The greater up-front cost of the map construction is offset by the fact that an expression can contain multiple attribute references, so the cost of the map construction is amortized across a number of lookups.

Perf. benchmarks to follow. /cc ericl

Author: Josh Rosen <joshrosen@databricks.com>

Closes #13505 from JoshRosen/bind-references-improvement.

(cherry picked from commit 0b8d694)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
@asfgit asfgit closed this in 0b8d694 Jun 6, 2016
@JoshRosen JoshRosen deleted the bind-references-improvement branch June 6, 2016 18:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants