Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32592][SQL] Make DataFrameReader.table take the specified options #29535

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -846,9 +846,9 @@ class Analyzer(
*/
object ResolveTempViews extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(ident) =>
case u @ UnresolvedRelation(ident, _) =>
lookupTempView(ident).getOrElse(u)
case i @ InsertIntoStatement(UnresolvedRelation(ident), _, _, _, _) =>
case i @ InsertIntoStatement(UnresolvedRelation(ident, _), _, _, _, _) =>
lookupTempView(ident)
.map(view => i.copy(table = view))
.getOrElse(i)
Expand Down Expand Up @@ -895,7 +895,7 @@ class Analyzer(
object ResolveTables extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case u: UnresolvedRelation =>
lookupV2Relation(u.multipartIdentifier)
lookupV2Relation(u.multipartIdentifier, u.options)
.map { rel =>
val ident = rel.identifier.get
SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
Expand All @@ -912,7 +912,7 @@ class Analyzer(
.getOrElse(u)

case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved =>
lookupV2Relation(u.multipartIdentifier)
lookupV2Relation(u.multipartIdentifier, u.options)
.map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i)

Expand All @@ -928,12 +928,14 @@ class Analyzer(
/**
* Performs the lookup of DataSourceV2 Tables from v2 catalog.
*/
private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
private def lookupV2Relation(
identifier: Seq[String],
options: CaseInsensitiveStringMap): Option[DataSourceV2Relation] =
expandRelationName(identifier) match {
case NonSessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident) match {
case Some(table) =>
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
case None => None
}
case _ => None
Expand Down Expand Up @@ -975,7 +977,7 @@ class Analyzer(
case i @ InsertIntoStatement(table, _, _, _, _) if i.query.resolved =>
val relation = table match {
case u: UnresolvedRelation =>
lookupRelation(u.multipartIdentifier).getOrElse(u)
lookupRelation(u.multipartIdentifier, u.options).getOrElse(u)
case other => other
}

Expand All @@ -986,7 +988,7 @@ class Analyzer(
}

case u: UnresolvedRelation =>
lookupRelation(u.multipartIdentifier).map(resolveViews).getOrElse(u)
lookupRelation(u.multipartIdentifier, u.options).map(resolveViews).getOrElse(u)

case u @ UnresolvedTable(identifier) =>
lookupTableOrView(identifier).map {
Expand Down Expand Up @@ -1016,7 +1018,9 @@ class Analyzer(
// 1) If the resolved catalog is not session catalog, return None.
// 2) If a relation is not found in the catalog, return None.
// 3) If a v1 table is found, create a v1 relation. Otherwise, create a v2 relation.
private def lookupRelation(identifier: Seq[String]): Option[LogicalPlan] = {
private def lookupRelation(
identifier: Seq[String],
options: CaseInsensitiveStringMap): Option[LogicalPlan] = {
expandRelationName(identifier) match {
case SessionCatalogAndIdentifier(catalog, ident) =>
lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map {
Expand All @@ -1025,7 +1029,7 @@ class Analyzer(
case table =>
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
}
val key = catalog.name +: ident.namespace :+ ident.name
AnalysisContext.get.relationCache.get(key).map(_.transform {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
plan: LogicalPlan,
cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan =
plan resolveOperatorsUp {
case u @ UnresolvedRelation(Seq(table)) =>
case u @ UnresolvedRelation(Seq(table), _) =>
cteRelations.find(r => plan.conf.resolver(r._1, table)).map(_._2).getOrElse(u)

case other =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ object ResolveHints {

val newNode = CurrentOrigin.withOrigin(plan.origin) {
plan match {
case ResolvedHint(u @ UnresolvedRelation(ident), hint)
case ResolvedHint(u @ UnresolvedRelation(ident, _), hint)
if matchedIdentifierInHint(ident) =>
ResolvedHint(u, createHintInfo(hintName).merge(hint, hintErrorHandler))

case ResolvedHint(r: SubqueryAlias, hint)
if matchedIdentifierInHint(extractIdentifier(r)) =>
ResolvedHint(r, createHintInfo(hintName).merge(hint, hintErrorHandler))

case UnresolvedRelation(ident) if matchedIdentifierInHint(ident) =>
case UnresolvedRelation(ident, _) if matchedIdentifierInHint(ident) =>
ResolvedHint(plan, createHintInfo(hintName))

case r: SubqueryAlias if matchedIdentifierInHint(extractIdentifier(r)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.parser.ParserUtils
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LeafNode, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully
Expand All @@ -40,9 +41,12 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
* Holds the name of a relation that has yet to be looked up in a catalog.
*
* @param multipartIdentifier table name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add options to param doc?

* @param options options to scan this relation. Only applicable to v2 table scan.
Copy link
Member

@HyukjinKwon HyukjinKwon Sep 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I just noticed 5e82548 added the merging behaviour for V1.

Okay, maybe we should fix the comments here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have a followup to fix the comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened a PR #34075

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I created a PR without seeing your comment. Thanks @huaxingao.

*/
case class UnresolvedRelation(
multipartIdentifier: Seq[String]) extends LeafNode with NamedRelation {
multipartIdentifier: Seq[String],
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty())
extends LeafNode with NamedRelation {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

/** Returns a `.` separated name for this relation. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.StorageLevel

/** Used by [[TreeNode.getNodeNumbered]] when traversing the tree for a given number */
Expand Down Expand Up @@ -544,6 +545,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case None => Nil
case Some(null) => Nil
case Some(any) => any :: Nil
case map: CaseInsensitiveStringMap => truncatedString(
map.asCaseSensitiveMap().entrySet().toArray(), "[", ", ", "]", maxFields) :: Nil
case table: CatalogTable =>
table.storage.serde match {
case Some(serde) => table.identifier :: serde :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.annotation.Stable
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser}
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
Expand Down Expand Up @@ -823,7 +824,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/
def table(tableName: String): DataFrame = {
assertNoSpecifiedSchema("table")
sparkSession.table(tableName)
val multipartIdentifier =
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
Dataset.ofRows(sparkSession, UnresolvedRelation(multipartIdentifier,
new CaseInsensitiveStringMap(extraOptions.toMap.asJava)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ case class CreateViewCommand(
def verify(child: LogicalPlan) {
child.collect {
// Disallow creating permanent views based on temporary views.
case UnresolvedRelation(nameParts) if catalog.isTempView(nameParts) =>
case UnresolvedRelation(nameParts, _) if catalog.isTempView(nameParts) =>
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
s"referencing a temporary view ${nameParts.quoted}. " +
"Please create a temp view instead by CREATE TEMP VIEW")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ Output: []
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView

(3) UnresolvedRelation
Arguments: [explain_temp1]
Arguments: [explain_temp1], []

(4) Project
Arguments: ['key, 'val]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ Output: []
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView

(3) UnresolvedRelation
Arguments: [explain_temp1]
Arguments: [explain_temp1], []

(4) Project
Arguments: ['key, 'val]
Expand Down
15 changes: 15 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,21 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
}
}
}

test("Explain UnresolvedRelation with CaseInsensitiveStringMap options") {
val tableName = "test"
withTable(tableName) {
val df1 = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df1.write.saveAsTable(tableName)
val df2 = spark.read
.option("key1", "value1")
.option("KEY2", "VALUE2")
.table(tableName)
// == Parsed Logical Plan ==
// 'UnresolvedRelation [test], [key1=value1, KEY2=VALUE2]
checkKeywordsExistsInExplain(df2, keywords = "[key1=value1, KEY2=VALUE2]")
}
}
}

class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.QueryExecutionListener

Expand Down Expand Up @@ -186,4 +187,21 @@ class DataSourceV2DataFrameSuite
assert(e3.getMessage.contains(s"Cannot use interval type in the table schema."))
}
}

test("options to scan v2 table should be passed to DataSourceV2Relation") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
val df1 = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df1.write.saveAsTable(t1)

val optionName = "fakeOption"
val df2 = spark.read
.option(optionName, false)
.table(t1)
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
val options = df2.queryExecution.analyzed.collectFirst {
case d: DataSourceV2Relation => d.options
}.get
assert(options.get(optionName) === "false")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ private[hive] class TestHiveQueryExecution(
// Make sure any test tables referenced are loaded.
val referencedTables =
describedTables ++
logical.collect { case UnresolvedRelation(ident) => ident.asTableIdentifier }
logical.collect { case UnresolvedRelation(ident, _) => ident.asTableIdentifier }
val resolver = sparkSession.sessionState.conf.resolver
val referencedTestTables = referencedTables.flatMap { tbl =>
val testTableOpt = sparkSession.testTables.keys.find(resolver(_, tbl.table))
Expand Down