Skip to content

[SPARK-32421][SQL] Add code-gen for shuffled hash join #29277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand All @@ -50,6 +50,7 @@ trait CodegenSupport extends SparkPlan {
private def variablePrefix: String = this match {
case _: HashAggregateExec => "agg"
case _: BroadcastHashJoinExec => "bhj"
case _: ShuffledHashJoinExec => "shj"
case _: SortMergeJoinExec => "smj"
case _: RDDScanExec => "rdd"
case _: DataSourceScanExec => "scan"
Expand Down Expand Up @@ -903,6 +904,10 @@ case class CollapseCodegenStages(
// The children of SortMergeJoin should do codegen separately.
j.withNewChildren(j.children.map(
child => InputAdapter(insertWholeStageCodegen(child))))
case j: ShuffledHashJoinExec =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

codegen children of ShuffledHashJoinExec separately same as SortMergeJoinExec.

// The children of ShuffledHashJoin should do codegen separately.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and SortMergeJoin. I actually re-think about it and then figure out, because codegen related code rarely changes recently. It would be nice to add more comments here to explain it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - sure, wondering what kind of wording you are expecting here? does it look better with:

// The children of ShuffledHashJoin should do codegen separately,
// because codegen for ShuffledHashJoin depends on more than one row
// from the build side input.
// The children of SortMergeJoin should do codegen separately,
// because codegen for SortMergeJoin depends on more than one row
// from the buffer side input.

j.withNewChildren(j.children.map(
Copy link
Member

@viirya viirya Jul 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this now. ShuffledHashJoin now does codegen like BroadcastHashJoin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - I don't think we can remove this. We have to do shuffled hash join codegen separately, as we have a hardcoded dependency for build side input input[1] when building relation. This can go wrong if we have multiple shuffled hash join in one query.

E.g.

  test("ShuffledHashJoin should be included in WholeStageCodegen") {
    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "30",
        SQLConf.SHUFFLE_PARTITIONS.key -> "2",
        SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
      val df1 = spark.range(5).select($"id".as("k1"))
      val df2 = spark.range(15).select($"id".as("k2"))
      val df3 = spark.range(6).select($"id".as("k3"))
      val twoJoinsDF = df1.join(df2, $"k1" === $"k2").join(df3, $"k1" === $"k3")
    }
  }

If we don't codegen shuffled hash join children separately, we will get something like:

/* 018 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 019 */     partitionIndex = index;
/* 020 */     this.inputs = inputs;
/* 021 */     inputadapter_input_0 = inputs[0];
/* 022 */     shj_relation_0 = ((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[0] /* plan */).buildHashedRelation(inputs[1]);
/* 023 */     shj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 024 */     shj_relation_1 = ((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[2] /* plan */).buildHashedRelation(inputs[1]);
/* 025 */     shj_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
/* 026 */
/* 027 */   }

shj_relation_0 and shj_relation_1 will try to build hash relation on same input (but shouldn't), as the input[1] is hardcoded there. On the other hand, I couldn't think of an alternative way not to hardcode input[1] here in codegen. Let me know if you have any better options. Thanks. I also updated WholeStageCodegenSuite.scala to have a unit test for this kind of multiple joins query.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we only need to do it for the build side?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that's true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, @viirya - if we only codegen separately for build side, we would still have the same problem as above for multiple SHJs right? Essentially we would fuse multiple stream sides codegen together in one codegen method, so we will have multiple build side initialized in init(), and naming collision as above. Let me know if it doesn't make sense, or I can create a counter example here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sounds good, non-trivial for me now as well. Will try to resolve it in the future. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And there are more problems if we have many shuffle hash join stay together. We need to accumulate the CodegenSupport.inputRDDs, but WholeStageCodegenExec only supports up to 2 input RDDs for now.

Copy link
Contributor Author

@c21 c21 Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to accumulate the CodegenSupport.inputRDDs, but WholeStageCodegenExec only supports up to 2 input RDDs for now.

Yes. Agreed. SortMergeJoinExec took the decision to do codegen for children separately, it's just simpler without getting into these limitations.

Copy link
Member

@viirya viirya Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we only codegen separately for build side, we should not build hash relation using inputs[1] anymore. It will work similarly like BroadcastHashJoin, we just build hash relation using buildPlan. And ShuffledHashJoinExec has only one input RDD now, like BroadcastHashJoin's inputRDDs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, it still has some problems. Will think about it if I have more time.

child => InputAdapter(insertWholeStageCodegen(child))))
case p => p.withNewChildren(p.children.map(insertInputAdapter))
}
}
Expand Down
Loading