-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-24250][SQL] support accessing SQLConf inside tasks #21376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because | ||
// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have | ||
// active SparkSession and `SQLConf.get` may point to the wrong configs. | ||
val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I re-run the JsonBenmark
and no performance regression is observed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the same problem happen also in other places? this seems to be quite a tricky issue which may happen in general. Can we avoid it somehow?
spark = null | ||
} | ||
|
||
test("ReadonlySQLConf is correctly created at the executor side") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ReadOnlySQLConf
Test build #90856 has finished for PR 21376 at commit
|
LGTM, thanks |
retest this please |
@@ -107,7 +106,13 @@ object SQLConf { | |||
* run tests in parallel. At the time this feature was implemented, this was a no-op since we | |||
* run unit tests (that does not involve SparkSession) in serial order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to update the above comments.
val originalLocalProps = allConfigs.collect { | ||
case (key, value) if key.startsWith("spark") => | ||
val originalValue = sc.getLocalProperty(key) | ||
sc.setLocalProperty(key, value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that possible the same key already exists?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users happen to set the same key in the local properties and want to access them in tasks, we will break it. It's very unlikely to happen and I'd say SQL config keys should be reserved for internal usage only.
body | ||
} finally { | ||
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId) | ||
for ((key, value) <- originalLocalProps) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before we set the original one, should we reset the new key with null values?
def setLocalProperty(key: String, value: String) {
if (value == null) {
localProperties.get.remove(key)
} else {
localProperties.get.setProperty(key, value)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
originalLocalProps
already contains entries with null value.
if (TaskContext.get != null) { | ||
new ReadOnlySQLConf(TaskContext.get()) | ||
} else { | ||
if (Utils.isTesting && SparkContext.getActive.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good check!!
Test build #90862 has finished for PR 21376 at commit
|
Test build #90864 has finished for PR 21376 at commit
|
SparkContext.getActive.get.dagScheduler.eventProcessLoop.eventThread | ||
if (schedulerEventLoopThread.getId == Thread.currentThread().getId) { | ||
throw new RuntimeException("Cannot get SQLConf inside scheduler event loop thread.") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we also add a note in PR description that we prevent accessing SQLConf in scheduler event loop?
val schedulerEventLoopThread = | ||
SparkContext.getActive.get.dagScheduler.eventProcessLoop.eventThread | ||
if (schedulerEventLoopThread.getId == Thread.currentThread().getId) { | ||
throw new RuntimeException("Cannot get SQLConf inside scheduler event loop thread.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we need to add a short comment for the reason. WDYT?
LGTM |
Test build #90886 has finished for PR 21376 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
thanks, merging to master! |
I'm still seeing a lot of build failures which seem to be related to this (accessing a conf in a task in turn accesses the LiveListenerBus). Is this something new? Or related to this change? eg.
|
## What changes were proposed in this pull request? This fixes a perf regression caused by #21376 . We should not use `RDD#toLocalIterator`, which triggers one Spark job per RDD partition. This is very bad for RDDs with a lot of small partitions. To fix it, this PR introduces a way to access SQLConf in the scheduler event loop thread, so that we don't need to use `RDD#toLocalIterator` anymore in `JsonInferSchema`. ## How was this patch tested? a new test Closes #22152 from cloud-fan/conf. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Xiao Li <gatorsmile@gmail.com>
re-submit #21299 which broke build.
A few new commits are added to fix the SQLConf problem in
JsonSchemaInference.infer
, and prevent us to accessSQLConf
in DAGScheduler event loop thread.What changes were proposed in this pull request?
Previously in #20136 we decided to forbid tasks to access
SQLConf
, because it doesn't work and always give you the default conf value. In #21190 we fixed the check and all the places that violate it.Currently the pattern of accessing configs at the executor side is: read the configs at the driver side, then access the variables holding the config values in the RDD closure, so that they will be serialized to the executor side. Something like
However, this pattern is hard to apply if the config needs to be propagated via a long call stack. An example is
DataType.sameType
, and see how many changes were made in #21190 .When it comes to code generation, it's even worse. I tried it locally and we need to change a ton of files to propagate configs to code generators.
This PR proposes to allow tasks to access
SQLConf
. The idea is, we can save all the SQL configs to job properties when an SQL execution is triggered. At executor side we rebuild theSQLConf
from job properties.How was this patch tested?
a new test suite