Skip to content

Commit 806140d

Browse files
huaxingaocloud-fan
authored andcommitted
[SPARK-32592][SQL] Make DataFrameReader.table take the specified options
### What changes were proposed in this pull request? pass specified options in DataFrameReader.table to JDBCTableCatalog.loadTable ### Why are the changes needed? Currently, `DataFrameReader.table` ignores the specified options. The options specified like the following are lost. ``` val df = spark.read .option("partitionColumn", "id") .option("lowerBound", "0") .option("upperBound", "3") .option("numPartitions", "2") .table("h2.test.people") ``` We need to make `DataFrameReader.table` take the specified options. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test for now. Will add a test after V2 JDBC read is implemented. Closes #29535 from huaxingao/table_options. Authored-by: Huaxin Gao <huaxing@us.ibm.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent eaaf783 commit 806140d

File tree

12 files changed

+68
-20
lines changed

12 files changed

+68
-20
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -846,9 +846,9 @@ class Analyzer(
846846
*/
847847
object ResolveTempViews extends Rule[LogicalPlan] {
848848
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
849-
case u @ UnresolvedRelation(ident) =>
849+
case u @ UnresolvedRelation(ident, _) =>
850850
lookupTempView(ident).getOrElse(u)
851-
case i @ InsertIntoStatement(UnresolvedRelation(ident), _, _, _, _) =>
851+
case i @ InsertIntoStatement(UnresolvedRelation(ident, _), _, _, _, _) =>
852852
lookupTempView(ident)
853853
.map(view => i.copy(table = view))
854854
.getOrElse(i)
@@ -895,7 +895,7 @@ class Analyzer(
895895
object ResolveTables extends Rule[LogicalPlan] {
896896
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
897897
case u: UnresolvedRelation =>
898-
lookupV2Relation(u.multipartIdentifier)
898+
lookupV2Relation(u.multipartIdentifier, u.options)
899899
.map { rel =>
900900
val ident = rel.identifier.get
901901
SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
@@ -912,7 +912,7 @@ class Analyzer(
912912
.getOrElse(u)
913913

914914
case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved =>
915-
lookupV2Relation(u.multipartIdentifier)
915+
lookupV2Relation(u.multipartIdentifier, u.options)
916916
.map(v2Relation => i.copy(table = v2Relation))
917917
.getOrElse(i)
918918

@@ -928,12 +928,14 @@ class Analyzer(
928928
/**
929929
* Performs the lookup of DataSourceV2 Tables from v2 catalog.
930930
*/
931-
private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
931+
private def lookupV2Relation(
932+
identifier: Seq[String],
933+
options: CaseInsensitiveStringMap): Option[DataSourceV2Relation] =
932934
expandRelationName(identifier) match {
933935
case NonSessionCatalogAndIdentifier(catalog, ident) =>
934936
CatalogV2Util.loadTable(catalog, ident) match {
935937
case Some(table) =>
936-
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
938+
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
937939
case None => None
938940
}
939941
case _ => None
@@ -975,7 +977,7 @@ class Analyzer(
975977
case i @ InsertIntoStatement(table, _, _, _, _) if i.query.resolved =>
976978
val relation = table match {
977979
case u: UnresolvedRelation =>
978-
lookupRelation(u.multipartIdentifier).getOrElse(u)
980+
lookupRelation(u.multipartIdentifier, u.options).getOrElse(u)
979981
case other => other
980982
}
981983

@@ -986,7 +988,7 @@ class Analyzer(
986988
}
987989

988990
case u: UnresolvedRelation =>
989-
lookupRelation(u.multipartIdentifier).map(resolveViews).getOrElse(u)
991+
lookupRelation(u.multipartIdentifier, u.options).map(resolveViews).getOrElse(u)
990992

991993
case u @ UnresolvedTable(identifier) =>
992994
lookupTableOrView(identifier).map {
@@ -1016,7 +1018,9 @@ class Analyzer(
10161018
// 1) If the resolved catalog is not session catalog, return None.
10171019
// 2) If a relation is not found in the catalog, return None.
10181020
// 3) If a v1 table is found, create a v1 relation. Otherwise, create a v2 relation.
1019-
private def lookupRelation(identifier: Seq[String]): Option[LogicalPlan] = {
1021+
private def lookupRelation(
1022+
identifier: Seq[String],
1023+
options: CaseInsensitiveStringMap): Option[LogicalPlan] = {
10201024
expandRelationName(identifier) match {
10211025
case SessionCatalogAndIdentifier(catalog, ident) =>
10221026
lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map {
@@ -1025,7 +1029,7 @@ class Analyzer(
10251029
case table =>
10261030
SubqueryAlias(
10271031
catalog.name +: ident.asMultipartIdentifier,
1028-
DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
1032+
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
10291033
}
10301034
val key = catalog.name +: ident.namespace :+ ident.name
10311035
AnalysisContext.get.relationCache.get(key).map(_.transform {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
171171
plan: LogicalPlan,
172172
cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan =
173173
plan resolveOperatorsUp {
174-
case u @ UnresolvedRelation(Seq(table)) =>
174+
case u @ UnresolvedRelation(Seq(table), _) =>
175175
cteRelations.find(r => plan.conf.resolver(r._1, table)).map(_._2).getOrElse(u)
176176

177177
case other =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,15 @@ object ResolveHints {
105105

106106
val newNode = CurrentOrigin.withOrigin(plan.origin) {
107107
plan match {
108-
case ResolvedHint(u @ UnresolvedRelation(ident), hint)
108+
case ResolvedHint(u @ UnresolvedRelation(ident, _), hint)
109109
if matchedIdentifierInHint(ident) =>
110110
ResolvedHint(u, createHintInfo(hintName).merge(hint, hintErrorHandler))
111111

112112
case ResolvedHint(r: SubqueryAlias, hint)
113113
if matchedIdentifierInHint(extractIdentifier(r)) =>
114114
ResolvedHint(r, createHintInfo(hintName).merge(hint, hintErrorHandler))
115115

116-
case UnresolvedRelation(ident) if matchedIdentifierInHint(ident) =>
116+
case UnresolvedRelation(ident, _) if matchedIdentifierInHint(ident) =>
117117
ResolvedHint(plan, createHintInfo(hintName))
118118

119119
case r: SubqueryAlias if matchedIdentifierInHint(extractIdentifier(r)) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
2525
import org.apache.spark.sql.catalyst.parser.ParserUtils
26-
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LeafNode, LogicalPlan, UnaryNode}
26+
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
2727
import org.apache.spark.sql.catalyst.trees.TreeNode
2828
import org.apache.spark.sql.catalyst.util.quoteIdentifier
2929
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
3030
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
31+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3132

3233
/**
3334
* Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully
@@ -40,9 +41,12 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
4041
* Holds the name of a relation that has yet to be looked up in a catalog.
4142
*
4243
* @param multipartIdentifier table name
44+
* @param options options to scan this relation. Only applicable to v2 table scan.
4345
*/
4446
case class UnresolvedRelation(
45-
multipartIdentifier: Seq[String]) extends LeafNode with NamedRelation {
47+
multipartIdentifier: Seq[String],
48+
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty())
49+
extends LeafNode with NamedRelation {
4650
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
4751

4852
/** Returns a `.` separated name for this relation. */

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
3939
import org.apache.spark.sql.catalyst.util.truncatedString
4040
import org.apache.spark.sql.internal.SQLConf
4141
import org.apache.spark.sql.types._
42+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4243
import org.apache.spark.storage.StorageLevel
4344

4445
/** Used by [[TreeNode.getNodeNumbered]] when traversing the tree for a given number */
@@ -544,6 +545,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
544545
case None => Nil
545546
case Some(null) => Nil
546547
case Some(any) => any :: Nil
548+
case map: CaseInsensitiveStringMap => truncatedString(
549+
map.asCaseSensitiveMap().entrySet().toArray(), "[", ", ", "]", maxFields) :: Nil
547550
case table: CatalogTable =>
548551
table.storage.serde match {
549552
case Some(serde) => table.identifier :: serde :: Nil

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.Stable
2828
import org.apache.spark.api.java.JavaRDD
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.rdd.RDD
31+
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
3132
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser}
3233
import org.apache.spark.sql.catalyst.expressions.ExprUtils
3334
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
@@ -823,7 +824,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
823824
*/
824825
def table(tableName: String): DataFrame = {
825826
assertNoSpecifiedSchema("table")
826-
sparkSession.table(tableName)
827+
val multipartIdentifier =
828+
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
829+
Dataset.ofRows(sparkSession, UnresolvedRelation(multipartIdentifier,
830+
new CaseInsensitiveStringMap(extraOptions.toMap.asJava)))
827831
}
828832

829833
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ case class CreateViewCommand(
174174
def verify(child: LogicalPlan) {
175175
child.collect {
176176
// Disallow creating permanent views based on temporary views.
177-
case UnresolvedRelation(nameParts) if catalog.isTempView(nameParts) =>
177+
case UnresolvedRelation(nameParts, _) if catalog.isTempView(nameParts) =>
178178
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
179179
s"referencing a temporary view ${nameParts.quoted}. " +
180180
"Please create a temp view instead by CREATE TEMP VIEW")

sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,7 @@ Output: []
693693
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
694694

695695
(3) UnresolvedRelation
696-
Arguments: [explain_temp1]
696+
Arguments: [explain_temp1], []
697697

698698
(4) Project
699699
Arguments: ['key, 'val]

sql/core/src/test/resources/sql-tests/results/explain.sql.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -827,7 +827,7 @@ Output: []
827827
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
828828

829829
(3) UnresolvedRelation
830-
Arguments: [explain_temp1]
830+
Arguments: [explain_temp1], []
831831

832832
(4) Project
833833
Arguments: ['key, 'val]

sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,21 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
408408
}
409409
}
410410
}
411+
412+
test("Explain UnresolvedRelation with CaseInsensitiveStringMap options") {
413+
val tableName = "test"
414+
withTable(tableName) {
415+
val df1 = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
416+
df1.write.saveAsTable(tableName)
417+
val df2 = spark.read
418+
.option("key1", "value1")
419+
.option("KEY2", "VALUE2")
420+
.table(tableName)
421+
// == Parsed Logical Plan ==
422+
// 'UnresolvedRelation [test], [key1=value1, KEY2=VALUE2]
423+
checkKeywordsExistsInExplain(df2, keywords = "[key1=value1, KEY2=VALUE2]")
424+
}
425+
}
411426
}
412427

413428
class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuite {

0 commit comments

Comments
 (0)