Skip to content

[SPARK-22103] Move HashAggregateExec parent consume to a separate function in codegen #19324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

juliuszsompolski
Copy link
Contributor

What changes were proposed in this pull request?

HashAggregateExec codegen uses two paths for fast hash table and a generic one.
It generates code paths for iterating over both, and both code paths generate the consume code of the parent operator, resulting in that code being expanded twice.
This leads to a long generated function that might be an issue for the compiler (see e.g. SPARK-21603).
I propose to remove the double expansion by generating the consume code in a helper function that can just be called from both iterating loops.

An issue with separating the consume code to a helper function was that a number of places relied and assumed on being in the scope of an outside produce loop and e.g. use continue to jump out.
I replaced such code flows with nested scopes. It is code that should be handled the same by compiler, while getting rid of depending on assumptions that are outside of the consume's own scope.

How was this patch tested?

Existing test coverage.

@juliuszsompolski
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Sep 22, 2017

Test build #82088 has started for PR 19324 at commit ca64368.

val doAggFuncName = ctx.addNewFunction(doAgg,
s"""
${generateGenerateCode}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a tangent fix: this generated code for the hash map was piggy-backed here together with the doAggregateWithKeys function, and it could become inaccessible from the top function if the function gets generated in a nested class (after #18075)

@@ -329,6 +332,15 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
def doCodeGen(): (CodegenContext, CodeAndComment) = {
val ctx = new CodegenContext
val code = child.asInstanceOf[CodegenSupport].produce(ctx, this)

// main next function.
ctx.addNewFunction("processNext",
Copy link
Contributor Author

@juliuszsompolski juliuszsompolski Sep 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tangent fix: add processNext() with addNewFunction, so that it is also taken into account by #18810

} else if (modes.contains(Partial) || modes.contains(PartialMerge)) {
// This should be the last operator in a stage, we should output UnsafeRow directly
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tangent fix: The partial aggregation doesn't necessarily have to be the last operator in the stage. E.g. if the shuffle requirement between the partial/final aggregation was already satisfied, or between 2. and 3. in planAggregateWithOneDistinct. Outputting the UnsafeRow through UnsafeRowJoiner was unnecessary then.

@juliuszsompolski
Copy link
Contributor Author

@viirya This is related to #18931, as it also separates out the consume function. Maybe it would be enough to do similar splits into functions in the codegen of some operators that are materialization points (sort, joins) to keep the function length in check?
Splitting out on every consume takes away some of compiler's opportunities to optimize, like e.g. delaying evaluation of some projection (which you mentioned in your PR).
Removing the use of continue also simplifies not needing to handle it in your PR.

@viirya
Copy link
Member

viirya commented Sep 24, 2017

@juliuszsompolski Thanks for pinging me.

#18931 is an attempt to separate the consume function as it can as possible. With long chain of any operators, you can have a long consume function and fail JIT, this is the one reason it tries to split into functions at the root of codegen support, instead of in few operators individually. I'd avoid to duplicate the separate logic in all operators, IMO.

For the explicit delaying evaluation of projection, currently the strategy I take is not going to split it. I guess that you mean the evaluation that can be delayed by the compiler, I personally think it should not be an observable impact under the whole-stage codegen framework. The reason is those evaluation are actually needed and can't be avoided in most of (if not all) cases. From the benchmark we can see there is no negative impact even in the cases where no long consume function exists.

Yeah, I think the simplifies for the use of continue is a good thing. Personally I'd like to have this part merged first individually and so I can simplify #18931.

@@ -328,10 +325,11 @@ case class BroadcastHashJoinExec(
| UnsafeRow $matched = $matches != null && $matches.hasNext() ?
| (UnsafeRow) $matches.next() : null;
| ${checkCondition.trim}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm. This is for outer join. The same name but different value.

@@ -186,8 +186,7 @@ case class BroadcastHashJoinExec(
*/
private def getJoinCondition(
ctx: CodegenContext,
input: Seq[ExprCode],
anti: Boolean = false): (String, String, Seq[ExprCode]) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we never set it to true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I saw the refactor in codegenAnti, cool!

@gatorsmile
Copy link
Member

LGTM

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Sep 25, 2017

Test build #82153 has finished for PR 19324 at commit ca64368.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Merged to master

* Add extra source code to the outermost generated class.
* @param code verbatim source code to be added.
*/
def addExtraCode(code: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd call it addInnerClass, as ideally you can't add arbitrary code to outer class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Although it doesn't prevent you going to add functions, but we have addNewFunction for it. So we'd better claim that this is just for inner class.

* # code to evaluate the predicate expression, result is isNull1 and value2
* if (isNull1 || !value2) continue;
* # call consume(), which will call parent.doConsume()
* if (!isNull1 && value2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may lead to deeply nested code, but I don't have a better idea for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in reality the filter code generates a do { } while(false) with continue inside to jump out, just like it did before. There's appropriate comment to it there.
I didn't want to complicate this example here, so changing the "will generate" to "could generate" is intentional to kind of show that it could, but not necessarily will :-)

| ${consume(ctx, Seq.empty, {generateRow.value})}
| ${generateKeyRow.code}
| ${generateBufferRow.code}
| $outputFunc(${generateKeyRow.value}, ${generateBufferRow.value});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we didn't call outputCode before, are you fixing a potential bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generateRow.code was doing the job of outputCode before - i.e. putting all expected output into one UnsafeRow, from which the parent can consume it.

// resultExpressions are Attributes of groupingExpressions and aggregateBufferAttributes.
assert(resultExpressions.forall(_.isInstanceOf[Attribute]))
assert(resultExpressions.length ==
groupingExpressions.length + aggregateBufferAttributes.length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we don't have these 2 requirements for the modes.contains(Final) || modes.contains(Complete) branch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final/Complete aggregations can have arbitrary projections in their resultExpressions, while partial aggregations are always constructed with only the grouping keys and aggregate expressions. The code that was here before with the UnsafeRowJoiner was using this assumption, so now I put it into assertion.

"""

} else {
// generate result based on grouping key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only go to this branch when aggregateExpressions is empty, is that possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, e.g. for aggregation coming from Distinct.

@@ -201,11 +201,14 @@ case class FilterExec(condition: Expression, child: SparkPlan)
ev
}

// Note: wrap in "do { } while(false);", so the generated checks can jump out with "continue;"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is tricky, how hard it is to fix all places that use continue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, you are trying to avoid generating deeply nested if-else branches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

genPredicate and generated ~50 lines above would have to be rewritten to now use continue. As you pointed in a previous comment, that would potentially lead to very nested scopes. Shouldn't be a problem for the compiler; for code generation the genPredicate would have to maintain these scopes and where to end them - i.e. wherever it not places a continue, it would have to open a nested scope, and then it would have to be closed in a correct place.

|$numOutput.add(1);
|${consume(ctx, resultVars)}
|if ($matched != null) {
| $checkCondition {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good trick

@cloud-fan
Copy link
Contributor

a late LGTM :)

*/
def addExtraCode(code: String): Unit = {
extraCode.append(code)
classSize(outerClassName) += code.length
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The classSize is mainly used to deal with the limit of number of named constants. So I think we don't need to add extra code size into it, if we only add inner class?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants