-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-22938] Assert that SQLConf.get is accessed only on the driver. #20136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @cloud-fan |
private val fallbackConf = new ThreadLocal[SQLConf] { | ||
private lazy val fallbackConf = new ThreadLocal[SQLConf] { | ||
// assert that we're only accessing it on the driver. | ||
assert(SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need null checks for SparkEnv
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, can we check this only in test phases?
@@ -70,7 +72,12 @@ object SQLConf { | |||
* Default config. Only used when there is no active SparkSession for the thread. | |||
* See [[get]] for more information. | |||
*/ | |||
private val fallbackConf = new ThreadLocal[SQLConf] { | |||
private lazy val fallbackConf = new ThreadLocal[SQLConf] { | |||
if (Utils.isTesting && SparkEnv.get != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we just put this check in the constructor?
@@ -1087,6 +1089,11 @@ object SQLConf { | |||
class SQLConf extends Serializable with Logging { | |||
import SQLConf._ | |||
|
|||
if (Utils.isTesting && SparkEnv.get != null) { | |||
// assert that we're only accessing it on the driver. | |||
assert(SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also with a reasonable error message?
Test build #85600 has finished for PR 20136 at commit
|
Test build #85602 has finished for PR 20136 at commit
|
Test build #85606 has finished for PR 20136 at commit
|
Test build #85609 has finished for PR 20136 at commit
|
LGTM |
@@ -70,7 +72,7 @@ object SQLConf { | |||
* Default config. Only used when there is no active SparkSession for the thread. | |||
* See [[get]] for more information. | |||
*/ | |||
private val fallbackConf = new ThreadLocal[SQLConf] { | |||
private lazy val fallbackConf = new ThreadLocal[SQLConf] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it have to be lazy val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I checked (which was before I moved the assertion from here to SQLConf constructor, but it shouldn't matter), not having it as lazy resulted in it being instantiated eagerly as a static member of SQLConf object before SparkEnv was set and hitting the null on SparkEnv.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see, makes sense
thanks, merging to master/2.3! |
## What changes were proposed in this pull request? Assert if code tries to access SQLConf.get on executor. This can lead to hard to detect bugs, where the executor will read fallbackConf, falling back to default config values, ignoring potentially changed non-default configs. If a config is to be passed to executor code, it needs to be read on the driver, and passed explicitly. ## How was this patch tested? Check in existing tests. Author: Juliusz Sompolski <julek@databricks.com> Closes #20136 from juliuszsompolski/SPARK-22938. (cherry picked from commit 247a089) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… on the driver ## What changes were proposed in this pull request? This is a followup of apache#20136 . apache#20136 didn't really work because in the test, we are using local backend, which shares the driver side `SparkEnv`, so `SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER` doesn't work. This PR changes the check to `TaskContext.get != null`, and move the check to `SQLConf.get`, and fix all the places that violate this check: * `InMemoryTableScanExec#createAndDecompressColumn` is executed inside `rdd.map`, we can't access `conf.offHeapColumnVectorEnabled` there. apache#21223 merged * `DataType#sameType` may be executed in executor side, for things like json schema inference, so we can't call `conf.caseSensitiveAnalysis` there. This contributes to most of the code changes, as we need to add `caseSensitive` parameter to a lot of methods. * `ParquetFilters` is used in the file scan function, which is executed in executor side, so we can't can't call `conf.parquetFilterPushDownDate` there. apache#21224 merged * `WindowExec#createBoundOrdering` is called on executor side, so we can't use `conf.sessionLocalTimezone` there. apache#21225 merged * `JsonToStructs` can be serialized to executors and evaluate, we should not call `SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body. apache#21226 merged ## How was this patch tested? existing test Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21190 from cloud-fan/minor.
## What changes were proposed in this pull request? Previously in apache#20136 we decided to forbid tasks to access `SQLConf`, because it doesn't work and always give you the default conf value. In apache#21190 we fixed the check and all the places that violate it. Currently the pattern of accessing configs at the executor side is: read the configs at the driver side, then access the variables holding the config values in the RDD closure, so that they will be serialized to the executor side. Something like ``` val someConf = conf.getXXX child.execute().mapPartitions { if (someConf == ...) ... ... } ``` However, this pattern is hard to apply if the config needs to be propagated via a long call stack. An example is `DataType.sameType`, and see how many changes were made in apache#21190 . When it comes to code generation, it's even worse. I tried it locally and we need to change a ton of files to propagate configs to code generators. This PR proposes to allow tasks to access `SQLConf`. The idea is, we can save all the SQL configs to job properties when an SQL execution is triggered. At executor side we rebuild the `SQLConf` from job properties. ## How was this patch tested? a new test suite Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21299 from cloud-fan/config.
re-submit #21299 which broke build. A few new commits are added to fix the SQLConf problem in `JsonSchemaInference.infer`, and prevent us to access `SQLConf` in DAGScheduler event loop thread. ## What changes were proposed in this pull request? Previously in #20136 we decided to forbid tasks to access `SQLConf`, because it doesn't work and always give you the default conf value. In #21190 we fixed the check and all the places that violate it. Currently the pattern of accessing configs at the executor side is: read the configs at the driver side, then access the variables holding the config values in the RDD closure, so that they will be serialized to the executor side. Something like ``` val someConf = conf.getXXX child.execute().mapPartitions { if (someConf == ...) ... ... } ``` However, this pattern is hard to apply if the config needs to be propagated via a long call stack. An example is `DataType.sameType`, and see how many changes were made in #21190 . When it comes to code generation, it's even worse. I tried it locally and we need to change a ton of files to propagate configs to code generators. This PR proposes to allow tasks to access `SQLConf`. The idea is, we can save all the SQL configs to job properties when an SQL execution is triggered. At executor side we rebuild the `SQLConf` from job properties. ## How was this patch tested? a new test suite Author: Wenchen Fan <wenchen@databricks.com> Closes #21376 from cloud-fan/config.
… on the driver ## What changes were proposed in this pull request? This is a followup of apache#20136 . apache#20136 didn't really work because in the test, we are using local backend, which shares the driver side `SparkEnv`, so `SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER` doesn't work. This PR changes the check to `TaskContext.get != null`, and move the check to `SQLConf.get`, and fix all the places that violate this check: * `InMemoryTableScanExec#createAndDecompressColumn` is executed inside `rdd.map`, we can't access `conf.offHeapColumnVectorEnabled` there. apache#21223 merged * `DataType#sameType` may be executed in executor side, for things like json schema inference, so we can't call `conf.caseSensitiveAnalysis` there. This contributes to most of the code changes, as we need to add `caseSensitive` parameter to a lot of methods. * `ParquetFilters` is used in the file scan function, which is executed in executor side, so we can't can't call `conf.parquetFilterPushDownDate` there. apache#21224 merged * `WindowExec#createBoundOrdering` is called on executor side, so we can't use `conf.sessionLocalTimezone` there. apache#21225 merged * `JsonToStructs` can be serialized to executors and evaluate, we should not call `SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body. apache#21226 merged ## How was this patch tested? existing test Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21190 from cloud-fan/minor.
What changes were proposed in this pull request?
Assert if code tries to access SQLConf.get on executor.
This can lead to hard to detect bugs, where the executor will read fallbackConf, falling back to default config values, ignoring potentially changed non-default configs.
If a config is to be passed to executor code, it needs to be read on the driver, and passed explicitly.
How was this patch tested?
Check in existing tests.