-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-22103] Move HashAggregateExec parent consume to a separate function in codegen #19324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22103] Move HashAggregateExec parent consume to a separate function in codegen #19324
Conversation
Test build #82088 has started for PR 19324 at commit |
val doAggFuncName = ctx.addNewFunction(doAgg, | ||
s""" | ||
${generateGenerateCode} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a tangent fix: this generated code for the hash map was piggy-backed here together with the doAggregateWithKeys
function, and it could become inaccessible from the top function if the function gets generated in a nested class (after #18075)
@@ -329,6 +332,15 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co | |||
def doCodeGen(): (CodegenContext, CodeAndComment) = { | |||
val ctx = new CodegenContext | |||
val code = child.asInstanceOf[CodegenSupport].produce(ctx, this) | |||
|
|||
// main next function. | |||
ctx.addNewFunction("processNext", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tangent fix: add processNext() with addNewFunction
, so that it is also taken into account by #18810
} else if (modes.contains(Partial) || modes.contains(PartialMerge)) { | ||
// This should be the last operator in a stage, we should output UnsafeRow directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tangent fix: The partial aggregation doesn't necessarily have to be the last operator in the stage. E.g. if the shuffle requirement between the partial/final aggregation was already satisfied, or between 2. and 3. in planAggregateWithOneDistinct
. Outputting the UnsafeRow through UnsafeRowJoiner was unnecessary then.
@viirya This is related to #18931, as it also separates out the consume function. Maybe it would be enough to do similar splits into functions in the codegen of some operators that are materialization points (sort, joins) to keep the function length in check? |
@juliuszsompolski Thanks for pinging me. #18931 is an attempt to separate the consume function as it can as possible. With long chain of any operators, you can have a long consume function and fail JIT, this is the one reason it tries to split into functions at the root of codegen support, instead of in few operators individually. I'd avoid to duplicate the separate logic in all operators, IMO. For the explicit delaying evaluation of projection, currently the strategy I take is not going to split it. I guess that you mean the evaluation that can be delayed by the compiler, I personally think it should not be an observable impact under the whole-stage codegen framework. The reason is those evaluation are actually needed and can't be avoided in most of (if not all) cases. From the benchmark we can see there is no negative impact even in the cases where no long consume function exists. Yeah, I think the simplifies for the use of |
@@ -328,10 +325,11 @@ case class BroadcastHashJoinExec( | |||
| UnsafeRow $matched = $matches != null && $matches.hasNext() ? | |||
| (UnsafeRow) $matches.next() : null; | |||
| ${checkCondition.trim} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm. This is for outer join. The same name but different value.
@@ -186,8 +186,7 @@ case class BroadcastHashJoinExec( | |||
*/ | |||
private def getJoinCondition( | |||
ctx: CodegenContext, | |||
input: Seq[ExprCode], | |||
anti: Boolean = false): (String, String, Seq[ExprCode]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we never set it to true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We used to in https://github.com/apache/spark/pull/19324/files#diff-4455c05ddcdb096c36d9e0bd326dfe12L389, we don't anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, I saw the refactor in codegenAnti
, cool!
LGTM |
retest this please |
Test build #82153 has finished for PR 19324 at commit
|
Merged to master |
* Add extra source code to the outermost generated class. | ||
* @param code verbatim source code to be added. | ||
*/ | ||
def addExtraCode(code: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd call it addInnerClass
, as ideally you can't add arbitrary code to outer class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Although it doesn't prevent you going to add functions, but we have addNewFunction
for it. So we'd better claim that this is just for inner class.
* # code to evaluate the predicate expression, result is isNull1 and value2 | ||
* if (isNull1 || !value2) continue; | ||
* # call consume(), which will call parent.doConsume() | ||
* if (!isNull1 && value2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this may lead to deeply nested code, but I don't have a better idea for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in reality the filter code generates a do { } while(false)
with continue
inside to jump out, just like it did before. There's appropriate comment to it there.
I didn't want to complicate this example here, so changing the "will generate" to "could generate" is intentional to kind of show that it could, but not necessarily will :-)
| ${consume(ctx, Seq.empty, {generateRow.value})} | ||
| ${generateKeyRow.code} | ||
| ${generateBufferRow.code} | ||
| $outputFunc(${generateKeyRow.value}, ${generateBufferRow.value}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we didn't call outputCode
before, are you fixing a potential bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generateRow.code
was doing the job of outputCode
before - i.e. putting all expected output into one UnsafeRow, from which the parent can consume it.
// resultExpressions are Attributes of groupingExpressions and aggregateBufferAttributes. | ||
assert(resultExpressions.forall(_.isInstanceOf[Attribute])) | ||
assert(resultExpressions.length == | ||
groupingExpressions.length + aggregateBufferAttributes.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we don't have these 2 requirements for the modes.contains(Final) || modes.contains(Complete)
branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final/Complete aggregations can have arbitrary projections in their resultExpressions
, while partial aggregations are always constructed with only the grouping keys and aggregate expressions. The code that was here before with the UnsafeRowJoiner was using this assumption, so now I put it into assertion.
""" | ||
|
||
} else { | ||
// generate result based on grouping key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we only go to this branch when aggregateExpressions
is empty, is that possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, e.g. for aggregation coming from Distinct.
@@ -201,11 +201,14 @@ case class FilterExec(condition: Expression, child: SparkPlan) | |||
ev | |||
} | |||
|
|||
// Note: wrap in "do { } while(false);", so the generated checks can jump out with "continue;" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is tricky, how hard it is to fix all places that use continue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see, you are trying to avoid generating deeply nested if-else branches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
genPredicate
and generated
~50 lines above would have to be rewritten to now use continue
. As you pointed in a previous comment, that would potentially lead to very nested scopes. Shouldn't be a problem for the compiler; for code generation the genPredicate
would have to maintain these scopes and where to end them - i.e. wherever it not places a continue
, it would have to open a nested scope, and then it would have to be closed in a correct place.
|$numOutput.add(1); | ||
|${consume(ctx, resultVars)} | ||
|if ($matched != null) { | ||
| $checkCondition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good trick
a late LGTM :) |
*/ | ||
def addExtraCode(code: String): Unit = { | ||
extraCode.append(code) | ||
classSize(outerClassName) += code.length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The classSize
is mainly used to deal with the limit of number of named constants. So I think we don't need to add extra code size into it, if we only add inner class?
What changes were proposed in this pull request?
HashAggregateExec codegen uses two paths for fast hash table and a generic one.
It generates code paths for iterating over both, and both code paths generate the consume code of the parent operator, resulting in that code being expanded twice.
This leads to a long generated function that might be an issue for the compiler (see e.g. SPARK-21603).
I propose to remove the double expansion by generating the consume code in a helper function that can just be called from both iterating loops.
An issue with separating the
consume
code to a helper function was that a number of places relied and assumed on being in the scope of an outsideproduce
loop and e.g. usecontinue
to jump out.I replaced such code flows with nested scopes. It is code that should be handled the same by compiler, while getting rid of depending on assumptions that are outside of the
consume
's own scope.How was this patch tested?
Existing test coverage.