diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala index c1ab22150d024..dba6b343d9f8e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala @@ -22,6 +22,8 @@ import java.util.{Map => JMap} import scala.collection.mutable.HashMap import scala.util.matching.Regex +import org.apache.commons.lang3.SerializationUtils + private object ConfigReader { private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r @@ -56,6 +58,17 @@ private[spark] class ConfigReader(conf: ConfigProvider) { bindEnv(new EnvProvider()) bindSystem(new SystemProvider()) + protected[spark] val localProperties = + new InheritableThreadLocal[java.util.HashMap[String, String]] { + override protected def childValue(parent: java.util.HashMap[String, String]): + java.util.HashMap[String, String] = { + // Note: make a clone such that changes in the parent properties aren't reflected in + // the those of the children threads, which has confusing semantics (SPARK-10563). + SerializationUtils.clone(parent) + } + override protected def initialValue(): java.util.HashMap[String, String] = + new java.util.HashMap[String, String]() + } /** * Binds a prefix to a provider. This method is not thread-safe and should be called * before the instance is used to expand values. @@ -76,7 +89,9 @@ private[spark] class ConfigReader(conf: ConfigProvider) { /** * Reads a configuration key from the default provider, and apply variable substitution. */ - def get(key: String): Option[String] = conf.get(key).map(substitute) + def get(key: String): Option[String] = { + Option(localProperties.get().get(key)).orElse(conf.get(key)).map(substitute) + } /** * Perform variable substitution on the given input string. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 11f13c43437b7..66b8232944119 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4118,6 +4118,13 @@ class SQLConf extends Serializable with Logging { getOrElse(throw QueryExecutionErrors.noSuchElementExceptionError(key)) } + def setLocalProperty(key: String, value: String): Unit = { + if (value == null) { + reader.localProperties.get().remove(key) + } else { + reader.localProperties.get().put(key, value) + } + } /** * Return the value of Spark SQL configuration property for the given key. If the key is not set * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the @@ -4192,8 +4199,13 @@ class SQLConf extends Serializable with Logging { * Return all the configuration properties that have been set (i.e. not the default). * This creates a new copy of the config properties in the form of a Map. */ - def getAllConfs: immutable.Map[String, String] = - settings.synchronized { settings.asScala.toMap } + def getAllConfs: immutable.Map[String, String] = { + settings.synchronized { + var map = settings.asScala.toMap + reader.localProperties.get().asScala.foreach(entry => map += (entry._1 -> entry._2)) + map + } + } /** * Return all the configuration definitions that have been defined in [[SQLConf]]. Each