-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-32748][SQL] Support local property propagation in SubqueryBroadcastExec #29589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #128072 has finished for PR 29589 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
Outdated
Show resolved
Hide resolved
Test build #128173 has finished for PR 29589 at commit
|
} | ||
|
||
try { | ||
SQLConf.get.setConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this really work? It's a static conf. As long as there is an earlier test triggers DPP, this won't work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are right.. but it's not because it's a static conf, it's because executionContext
is in the SubqueryBroadcastExec
object.
This makes it hard to write unit test. Do you have any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it was the same before: https://github.com/apache/spark/pull/27266/files#diff-6dabaf3c491be73cc0479b1f368d157bR164
Can we move this test to there? Then it's easier to read as these special tests are put together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the advice! Updated.
spark.sparkContext.setLocalProperty(propKey, propValue) | ||
val df = sql( | ||
s""" | ||
|SELECT compare_property_value(f.date_id, '$propKey', '$propValue') as col |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is compare_property_value
really evaluated in SubqueryBroadcastExec
threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, IIUC udf created by spark.udf.register
is not foldable, so it will be evaluated in tasks. Besides, otherwise TaskContext.get() will get null if it's not run in task I think.
Test build #128255 has finished for PR 29589 at commit
|
Test build #128303 has finished for PR 29589 at commit
|
Test build #128302 has finished for PR 29589 at commit
|
thanks, merging to master! |
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here. | ||
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) | ||
Future { | ||
SQLExecution.withThreadLocalCaptured[Array[InternalRow]]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks the added test can pass without this fix (I think the comment seems to be related to this issue). Could you check this again? @wzhfy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I'll check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu I think you are right. Sorry I misunderstood your comment previously.
Because in SubqueryBroadcastExec.relationFuture
, what it really calls is child.executeBroadcast[HashedRelation]().value
, it should use "broadcast-exchange" threads.
As a result, it seems that propagation for thread local properties in SubqueryBroadcastExec
is not necessary, they will be propagated by broadcast threads anyway.
BTW, the current test case is also not reasonable, since the udf is not evaluated in the broadcasted part. I moved it to a broadcast filter (like below) and found the above problem. Thanks for pointing this out!
test("SPARK-32748: propagate local properties to dynamic pruning thread") {
val factTable = "fact_local_prop_dpp"
val dimTable = "dim_local_prop_dpp"
val filteringValue = 3
def checkPropertyValueByUdfResult(
propKey: String,
propValue: String,
expectedResultCount: Int): Unit = {
spark.sparkContext.setLocalProperty(propKey, propValue)
val df = sql(
s"""
|SELECT f.id
|FROM $factTable f
|INNER JOIN $dimTable s
|ON f.id = s.id
|AND compare_property_value(s.value, '$propKey', '$propValue') = $filteringValue
""".stripMargin)
val subqueryBroadcastSeq = df.queryExecution.executedPlan.flatMap {
case s: FileSourceScanExec => s.partitionFilters.collect {
case DynamicPruningExpression(InSubqueryExec(_, b: SubqueryBroadcastExec, _, _)) => b
}
case _ => Nil
}
assert(subqueryBroadcastSeq.nonEmpty,
s"Should trigger DPP with a reused broadcast exchange:\n${df.queryExecution}")
assert(df.collect().length == expectedResultCount)
}
withTable(factTable, dimTable) {
spark.range(10).select($"id", $"id".as("value"))
.write.partitionBy("id").mode("overwrite").saveAsTable(factTable)
spark.range(5).select($"id", $"id".as("value"))
.write.mode("overwrite").saveAsTable(dimTable)
withSQLConf(
StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD.key -> "1",
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
try {
spark.udf.register(
"compare_property_value",
(input: Int, propKey: String, propValue: String) => {
if (TaskContext.get().getLocalProperty(propKey) == propValue) {
filteringValue
} else {
input
}
}
)
val propKey = "spark.sql.subquery.broadcast.prop.key"
// set local property and assert
val propValue1 = UUID.randomUUID().toString()
checkPropertyValueByUdfResult(propKey, propValue1, expectedResultCount = 5)
// change local property and re-assert
val propValue2 = UUID.randomUUID().toString()
checkPropertyValueByUdfResult(propKey, propValue2, expectedResultCount = 5)
} finally {
spark.sessionState.catalog.dropTempFunction("compare_property_value", true)
}
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu @cloud-fan So should we revert this pr? And raise a separate jira for configuring the number of subquery broadcast threads (maybe use a different config name other than StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD
)?
object SubqueryBroadcastExec {
private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("dynamic-pruning",
SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD)))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, then please revert it.
I don't think we need a separate config. The thread pool here is used to wait for the broadcast to finish, seems better to have same number of slots compared to broadcast exchange thread pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need a separate config. The thread pool here is used to wait for the broadcast to finish, seems better to have same number of slots compared to broadcast exchange thread pool.
@cloud-fan The previous number of subquery broadcast threads is 16, while the default number of broadcast exchange threads is 128.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's open a new PR to change the number of threads and discuss there. 16 threads probably are fine, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good if we can set this flag.
@wzhfy @maropu @cloud-fan sorry if I am late to the party. We should consider keeping this. It is generally good practice to use a narrow waist for these things in the code. It will save someone in the future some time debugging why his change is not working :).
…ueryBroadcastExec" ### What changes were proposed in this pull request? This reverts commit 04f7f6d due to the discussion in [comment](#29589 (comment)). ### Why are the changes needed? Based on the discussion in [comment](#29589 (comment)), propagation for thread local properties in `SubqueryBroadcastExec` is not necessary, since they will be propagated by broadcast exchange threads anyway. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Also revert the added test. Closes #29674 from wzhfy/revert_dpp_thread_local. Authored-by: Zhenhua Wang <wzh_zju@163.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
What changes were proposed in this pull request?
Since SPARK-22590, local property propagation is supported through
SQLExecution.withThreadLocalCaptured
in bothBroadcastExchangeExec
andSubqueryExec
when computingrelationFuture
. This pr adds the support inSubqueryBroadcastExec
.Why are the changes needed?
Local property propagation is missed in
SubqueryBroadcastExec
.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Add a new test.