Skip to content

Commit d74316b

Browse files
hn50927mming7
authored andcommitted
apache#11 SQLConf support thread local prop
1 parent 95e3414 commit d74316b

File tree

2 files changed

+30
-3
lines changed

2 files changed

+30
-3
lines changed

core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import java.util.{Map => JMap}
2222
import scala.collection.mutable.HashMap
2323
import scala.util.matching.Regex
2424

25+
import org.apache.commons.lang3.SerializationUtils
26+
2527
private object ConfigReader {
2628

2729
private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
@@ -56,6 +58,17 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
5658
bindEnv(new EnvProvider())
5759
bindSystem(new SystemProvider())
5860

61+
protected[spark] val localProperties =
62+
new InheritableThreadLocal[java.util.HashMap[String, String]] {
63+
override protected def childValue(parent: java.util.HashMap[String, String]):
64+
java.util.HashMap[String, String] = {
65+
// Note: make a clone such that changes in the parent properties aren't reflected in
66+
// the those of the children threads, which has confusing semantics (SPARK-10563).
67+
SerializationUtils.clone(parent)
68+
}
69+
override protected def initialValue(): java.util.HashMap[String, String] =
70+
new java.util.HashMap[String, String]()
71+
}
5972
/**
6073
* Binds a prefix to a provider. This method is not thread-safe and should be called
6174
* before the instance is used to expand values.
@@ -76,7 +89,9 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
7689
/**
7790
* Reads a configuration key from the default provider, and apply variable substitution.
7891
*/
79-
def get(key: String): Option[String] = conf.get(key).map(substitute)
92+
def get(key: String): Option[String] = {
93+
Option(localProperties.get().get(key)).orElse(conf.get(key)).map(substitute)
94+
}
8095

8196
/**
8297
* Perform variable substitution on the given input string.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3691,6 +3691,13 @@ class SQLConf extends Serializable with Logging {
36913691
getOrElse(throw new NoSuchElementException(key))
36923692
}
36933693

3694+
def setLocalProperty(key: String, value: String): Unit = {
3695+
if (value == null) {
3696+
reader.localProperties.get().remove(key)
3697+
} else {
3698+
reader.localProperties.get().put(key, value)
3699+
}
3700+
}
36943701
/**
36953702
* Return the value of Spark SQL configuration property for the given key. If the key is not set
36963703
* yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
@@ -3745,8 +3752,13 @@ class SQLConf extends Serializable with Logging {
37453752
* Return all the configuration properties that have been set (i.e. not the default).
37463753
* This creates a new copy of the config properties in the form of a Map.
37473754
*/
3748-
def getAllConfs: immutable.Map[String, String] =
3749-
settings.synchronized { settings.asScala.toMap }
3755+
def getAllConfs: immutable.Map[String, String] = {
3756+
settings.synchronized {
3757+
var map = settings.asScala.toMap
3758+
reader.localProperties.get().asScala.foreach(entry => map += (entry._1 -> entry._2))
3759+
map
3760+
}
3761+
}
37503762

37513763
/**
37523764
* Return all the configuration definitions that have been defined in [[SQLConf]]. Each

0 commit comments

Comments
 (0)