Skip to content

Commit b54cc55

Browse files
committed
[SPARK-32592][SQL] Make DataFrameReader.table take the specified options
1 parent e3a88a9 commit b54cc55

File tree

3 files changed

+11
-3
lines changed

3 files changed

+11
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.csv._
4040
import org.apache.spark.sql.execution.datasources.jdbc._
4141
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
4242
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils}
43-
import org.apache.spark.sql.internal.SQLConf
4443
import org.apache.spark.sql.types.StructType
4544
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4645
import org.apache.spark.unsafe.types.UTF8String
@@ -822,6 +821,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
822821
*/
823822
def table(tableName: String): DataFrame = {
824823
assertNoSpecifiedSchema("table")
824+
for ((k, v) <- this.extraOptions)
825+
sparkSession.conf.set(k, v)
825826
sparkSession.table(tableName)
826827
}
827828

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ class JdbcOptionsInWrite(
232232

233233
object JDBCOptions {
234234
private val curId = new java.util.concurrent.atomic.AtomicLong(0L)
235-
private val jdbcOptionNames = collection.mutable.Set[String]()
235+
val jdbcOptionNames = collection.mutable.Set[String]()
236236

237237
private def newOption(name: String): String = {
238238
jdbcOptionNames += name.toLowerCase(Locale.ROOT)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.sql.{Connection, SQLException}
2121
import scala.collection.JavaConverters._
2222

2323
import org.apache.spark.internal.Logging
24+
import org.apache.spark.sql.SparkSession
2425
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
2526
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange}
2627
import org.apache.spark.sql.connector.expressions.Transform
@@ -94,8 +95,14 @@ class JDBCTableCatalog extends TableCatalog with Logging {
9495

9596
override def loadTable(ident: Identifier): Table = {
9697
checkNamespace(ident.namespace())
98+
var newOptions = options.parameters
99+
SparkSession.active.sessionState.conf.settings.asScala.foreach {
100+
case (key, value) =>
101+
for (option <- JDBCOptions.jdbcOptionNames)
102+
if (key.equalsIgnoreCase(option)) newOptions += (key -> value)
103+
}
97104
val optionsWithTableName = new JDBCOptions(
98-
options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
105+
newOptions + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
99106
try {
100107
val schema = JDBCRDD.resolveTable(optionsWithTableName)
101108
JDBCTable(ident, schema, optionsWithTableName)

0 commit comments

Comments
 (0)