Skip to content

[SPARK-24250][SQL] support accessing SQLConf inside tasks #21376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented May 20, 2018

re-submit #21299 which broke build.

A few new commits are added to fix the SQLConf problem in JsonSchemaInference.infer, and prevent us to access SQLConf in DAGScheduler event loop thread.

What changes were proposed in this pull request?

Previously in #20136 we decided to forbid tasks to access SQLConf, because it doesn't work and always give you the default conf value. In #21190 we fixed the check and all the places that violate it.

Currently the pattern of accessing configs at the executor side is: read the configs at the driver side, then access the variables holding the config values in the RDD closure, so that they will be serialized to the executor side. Something like

val someConf = conf.getXXX
child.execute().mapPartitions {
  if (someConf == ...) ...
  ...
}

However, this pattern is hard to apply if the config needs to be propagated via a long call stack. An example is DataType.sameType, and see how many changes were made in #21190 .

When it comes to code generation, it's even worse. I tried it locally and we need to change a ton of files to propagate configs to code generators.

This PR proposes to allow tasks to access SQLConf. The idea is, we can save all the SQL configs to job properties when an SQL execution is triggered. At executor side we rebuild the SQLConf from job properties.

How was this patch tested?

a new test suite

// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because
// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have
// active SparkSession and `SQLConf.get` may point to the wrong configs.
val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-run the JsonBenmark and no performance regression is observed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the same problem happen also in other places? this seems to be quite a tricky issue which may happen in general. Can we avoid it somehow?

@cloud-fan
Copy link
Contributor Author

spark = null
}

test("ReadonlySQLConf is correctly created at the executor side") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ReadOnlySQLConf

@SparkQA
Copy link

SparkQA commented May 20, 2018

Test build #90856 has finished for PR 21376 at commit a1519d4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor

LGTM, thanks

@gatorsmile
Copy link
Member

retest this please

@@ -107,7 +106,13 @@ object SQLConf {
* run tests in parallel. At the time this feature was implemented, this was a no-op since we
* run unit tests (that does not involve SparkSession) in serial order.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update the above comments.

val originalLocalProps = allConfigs.collect {
case (key, value) if key.startsWith("spark") =>
val originalValue = sc.getLocalProperty(key)
sc.setLocalProperty(key, value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that possible the same key already exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users happen to set the same key in the local properties and want to access them in tasks, we will break it. It's very unlikely to happen and I'd say SQL config keys should be reserved for internal usage only.

body
} finally {
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
for ((key, value) <- originalLocalProps) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before we set the original one, should we reset the new key with null values?

  def setLocalProperty(key: String, value: String) {
    if (value == null) {
      localProperties.get.remove(key)
    } else {
      localProperties.get.setProperty(key, value)
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

originalLocalProps already contains entries with null value.

if (TaskContext.get != null) {
new ReadOnlySQLConf(TaskContext.get())
} else {
if (Utils.isTesting && SparkContext.getActive.isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good check!!

@SparkQA
Copy link

SparkQA commented May 20, 2018

Test build #90862 has finished for PR 21376 at commit d25e846.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 20, 2018

Test build #90864 has finished for PR 21376 at commit d25e846.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

SparkContext.getActive.get.dagScheduler.eventProcessLoop.eventThread
if (schedulerEventLoopThread.getId == Thread.currentThread().getId) {
throw new RuntimeException("Cannot get SQLConf inside scheduler event loop thread.")
}
Copy link
Member

@viirya viirya May 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we also add a note in PR description that we prevent accessing SQLConf in scheduler event loop?

val schedulerEventLoopThread =
SparkContext.getActive.get.dagScheduler.eventProcessLoop.eventThread
if (schedulerEventLoopThread.getId == Thread.currentThread().getId) {
throw new RuntimeException("Cannot get SQLConf inside scheduler event loop thread.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we need to add a short comment for the reason. WDYT?

@viirya
Copy link
Member

viirya commented May 21, 2018

LGTM

@SparkQA
Copy link

SparkQA commented May 21, 2018

Test build #90886 has finished for PR 21376 at commit f7a6299.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cloud-fan
Copy link
Contributor Author

thanks, merging to master!

@asfgit asfgit closed this in 03e90f6 May 21, 2018
@squito
Copy link
Contributor

squito commented Jun 1, 2018

I'm still seeing a lot of build failures which seem to be related to this (accessing a conf in a task in turn accesses the LiveListenerBus). Is this something new? Or related to this change? eg.
https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4193/testReport/org.apache.spark.sql.execution/UnsafeRowSerializerSuite/toUnsafeRow___test_helper_method/

sbt.ForkMain$ForkError: java.lang.IllegalStateException: LiveListenerBus is stopped.
	at org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:97)
	at org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:80)
	at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:93)
	at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:120)
	at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:120)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:120)
	at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:119)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:286)
	at org.apache.spark.sql.test.TestSparkSession.sessionState$lzycompute(TestSQLContext.scala:42)
	at org.apache.spark.sql.test.TestSparkSession.sessionState(TestSQLContext.scala:41)
	at org.apache.spark.sql.SparkSession$$anonfun$1$$anonfun$apply$1.apply(SparkSession.scala:95)
	at org.apache.spark.sql.SparkSession$$anonfun$1$$anonfun$apply$1.apply(SparkSession.scala:95)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.sql.SparkSession$$anonfun$1.apply(SparkSession.scala:95)
	at org.apache.spark.sql.SparkSession$$anonfun$1.apply(SparkSession.scala:94)
	at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:126)
	at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:54)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:157)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:150)
	at org.apache.spark.sql.execution.UnsafeRowSerializerSuite.org$apache$spark$sql$execution$UnsafeRowSerializerSuite$$unsafeRowConverter(UnsafeRowSerializerSuite.scala:54)
	at org.apache.spark.sql.execution.UnsafeRowSerializerSuite.org$apache$spark$sql$execution$UnsafeRowSerializerSuite$$toUnsafeRow(UnsafeRowSerializerSuite.scala:49)
	at org.apache.spark.sql.execution.UnsafeRowSerializerSuite$$anonfun$2.apply(UnsafeRowSerializerSuite.scala:63)
	at org.apache.spark.sql.execution.UnsafeRowSerializerSuite$$anonfun$2.apply(UnsafeRowSerializerSuite.scala:60)
...

asfgit pushed a commit that referenced this pull request Aug 22, 2018
## What changes were proposed in this pull request?

This fixes a perf regression caused by #21376 .

We should not use `RDD#toLocalIterator`, which triggers one Spark job per RDD partition. This is very bad for RDDs with a lot of small partitions.

To fix it, this PR introduces a way to access SQLConf in the scheduler event loop thread, so that we don't need to use `RDD#toLocalIterator` anymore in `JsonInferSchema`.

## How was this patch tested?

a new test

Closes #22152 from cloud-fan/conf.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants