Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32592][SQL] Make DataFrameReader.table take the specified options #29535

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix
  • Loading branch information
huaxingao committed Aug 27, 2020
commit 04cf5a1f95aa10092526f4fdd1071dba3be9ff2e
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util
import java.util.Locale
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
Expand All @@ -37,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.catalyst.util.{toPrettySQL, CaseInsensitiveMap}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
Expand Down Expand Up @@ -846,9 +847,9 @@ class Analyzer(
*/
object ResolveTempViews extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(ident) =>
case u @ UnresolvedRelation(ident, _) =>
lookupTempView(ident).getOrElse(u)
case i @ InsertIntoStatement(UnresolvedRelation(ident), _, _, _, _) =>
case i @ InsertIntoStatement(UnresolvedRelation(ident, _), _, _, _, _) =>
lookupTempView(ident)
.map(view => i.copy(table = view))
.getOrElse(i)
Expand Down Expand Up @@ -895,7 +896,7 @@ class Analyzer(
object ResolveTables extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case u: UnresolvedRelation =>
lookupV2Relation(u.multipartIdentifier)
lookupV2Relation(u.multipartIdentifier, u.options)
.map { rel =>
val ident = rel.identifier.get
SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
Expand All @@ -912,7 +913,7 @@ class Analyzer(
.getOrElse(u)

case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved =>
lookupV2Relation(u.multipartIdentifier)
lookupV2Relation(u.multipartIdentifier, u.options)
.map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i)

Expand All @@ -928,12 +929,16 @@ class Analyzer(
/**
* Performs the lookup of DataSourceV2 Tables from v2 catalog.
*/
private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
private def lookupV2Relation(
identifier: Seq[String],
options: CaseInsensitiveMap[String] = CaseInsensitiveMap[String](Map.empty))
: Option[DataSourceV2Relation] =
expandRelationName(identifier) match {
case NonSessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident) match {
case Some(table) =>
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
Some(DataSourceV2Relation.create(
table, Some(catalog), Some(ident), new CaseInsensitiveStringMap(options.asJava)))
case None => None
}
case _ => None
Expand Down Expand Up @@ -975,7 +980,7 @@ class Analyzer(
case i @ InsertIntoStatement(table, _, _, _, _) if i.query.resolved =>
val relation = table match {
case u: UnresolvedRelation =>
lookupRelation(u.multipartIdentifier).getOrElse(u)
lookupRelation(u.multipartIdentifier, u.options).getOrElse(u)
case other => other
}

Expand All @@ -986,7 +991,7 @@ class Analyzer(
}

case u: UnresolvedRelation =>
lookupRelation(u.multipartIdentifier).map(resolveViews).getOrElse(u)
lookupRelation(u.multipartIdentifier, u.options).map(resolveViews).getOrElse(u)

case u @ UnresolvedTable(identifier) =>
lookupTableOrView(identifier).map {
Expand Down Expand Up @@ -1016,7 +1021,10 @@ class Analyzer(
// 1) If the resolved catalog is not session catalog, return None.
// 2) If a relation is not found in the catalog, return None.
// 3) If a v1 table is found, create a v1 relation. Otherwise, create a v2 relation.
private def lookupRelation(identifier: Seq[String]): Option[LogicalPlan] = {
private def lookupRelation(
identifier: Seq[String],
options: CaseInsensitiveMap[String] = CaseInsensitiveMap[String](Map.empty))
: Option[LogicalPlan] = {
expandRelationName(identifier) match {
case SessionCatalogAndIdentifier(catalog, ident) =>
lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map {
Expand All @@ -1025,7 +1033,8 @@ class Analyzer(
case table =>
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
DataSourceV2Relation.create(table, Some(catalog), Some(ident),
new CaseInsensitiveStringMap(options.asJava)))
}
val key = catalog.name +: ident.namespace :+ ident.name
AnalysisContext.get.relationCache.get(key).map(_.transform {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.parser.ParserUtils
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LeafNode, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CaseInsensitiveMap}
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}

Expand All @@ -42,7 +42,9 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
* @param multipartIdentifier table name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add options to param doc?

*/
case class UnresolvedRelation(
multipartIdentifier: Seq[String]) extends LeafNode with NamedRelation {
multipartIdentifier: Seq[String],
options: CaseInsensitiveMap[String] = CaseInsensitiveMap[String](Map.empty))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use CaseInsensitiveStringMap since it's only for v2?

extends LeafNode with NamedRelation {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

/** Returns a `.` separated name for this relation. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.annotation.Stable
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser}
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
Expand Down Expand Up @@ -822,9 +823,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/
def table(tableName: String): DataFrame = {
assertNoSpecifiedSchema("table")
for ((k, v) <- this.extraOptions)
sparkSession.conf.set(k, v)
sparkSession.table(tableName)
val multipartIdentifier =
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
Dataset.ofRows(sparkSession, UnresolvedRelation(multipartIdentifier, extraOptions))
Copy link
Contributor

@cloud-fan cloud-fan Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should create CaseInsensitiveStringMap here:

new CaseInsensitiveStringMap(extraOptions.toMap.asJava)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toMap is required to get the original map (keys are not lowercased)

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ case class CreateViewCommand(
def verify(child: LogicalPlan) {
child.collect {
// Disallow creating permanent views based on temporary views.
case UnresolvedRelation(nameParts) if catalog.isTempView(nameParts) =>
case UnresolvedRelation(nameParts, _) if catalog.isTempView(nameParts) =>
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
s"referencing a temporary view ${nameParts.quoted}. " +
"Please create a temp view instead by CREATE TEMP VIEW")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class JdbcOptionsInWrite(

object JDBCOptions {
private val curId = new java.util.concurrent.atomic.AtomicLong(0L)
val jdbcOptionNames = collection.mutable.Set[String]()
private val jdbcOptionNames = collection.mutable.Set[String]()

private def newOption(name: String): String = {
jdbcOptionNames += name.toLowerCase(Locale.ROOT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.sql.{Connection, SQLException}
import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange}
import org.apache.spark.sql.connector.expressions.Transform
Expand Down Expand Up @@ -95,14 +94,8 @@ class JDBCTableCatalog extends TableCatalog with Logging {

override def loadTable(ident: Identifier): Table = {
checkNamespace(ident.namespace())
var newOptions = options.parameters
SparkSession.active.sessionState.conf.settings.asScala.foreach {
case (key, value) =>
for (option <- JDBCOptions.jdbcOptionNames)
if (key.equalsIgnoreCase(option)) newOptions += (key -> value)
}
val optionsWithTableName = new JDBCOptions(
newOptions + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
try {
val schema = JDBCRDD.resolveTable(optionsWithTableName)
JDBCTable(ident, schema, optionsWithTableName)
Expand Down