-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32592][SQL] Make DataFrameReader.table take the specified options #29535
Conversation
Test build #127860 has finished for PR 29535 at commit
|
@@ -822,6 +821,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { | |||
*/ | |||
def table(tableName: String): DataFrame = { | |||
assertNoSpecifiedSchema("table") | |||
for ((k, v) <- this.extraOptions) | |||
sparkSession.conf.set(k, v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to clearly define the behavior here.
I think the options don't make sense if we read temp view, view, or v1 tables in the session catalog. For v1 table, they are created with CREATE TABLE ... USING ... OPTIONS ...
, so the options are already there and it's confusing to overwrite them per scan.
We do need to keep the scan options when reading v2 tables, but I don't think session conf is the right channel. I think we should put the options in UnresolvedRelation
and apply these options when we resolve UnresolvedRelation
to v2 relations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Thanks for your comment.
I took a look at the code. I think I need to change the following:
I need to modify lots of files because of the above changes. Could you please check if this looks OK to you before I continue? Thank you very much!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We don't need to change
SparkSession.table
. We can createUnresolvedRelation
inDataFrameReader.table
directly. - This is options to scan the v2 table, not v2 table properties. We don't need to change
TableCatalog.loadTable
, but pass the scan options toTable.newScanBuilder
.DataSourceV2Relation
already takes the scan options, we just need to createDataSourceV2Relation
with scan options inResolveTables
andResolveRelations
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we already pass options from DataFrameReader
to DataSourceV2Relation
in load
API. DataFrameReader.table
misses this part.
Test build #127942 has finished for PR 29535 at commit
|
Test build #127943 has finished for PR 29535 at commit
|
@cloud-fan @viirya Could you please check one more time? Thanks! |
@@ -42,7 +42,9 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str | |||
* @param multipartIdentifier table name | |||
*/ | |||
case class UnresolvedRelation( | |||
multipartIdentifier: Seq[String]) extends LeafNode with NamedRelation { | |||
multipartIdentifier: Seq[String], | |||
options: CaseInsensitiveMap[String] = CaseInsensitiveMap[String](Map.empty)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use CaseInsensitiveStringMap
since it's only for v2?
sparkSession.table(tableName) | ||
val multipartIdentifier = | ||
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) | ||
Dataset.ofRows(sparkSession, UnresolvedRelation(multipartIdentifier, extraOptions)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should create CaseInsensitiveStringMap
here:
new CaseInsensitiveStringMap(extraOptions.toMap.asJava)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toMap
is required to get the original map (keys are not lowercased)
@@ -42,7 +42,9 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str | |||
* @param multipartIdentifier table name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add options
to param doc?
Fixed. Thank you! @cloud-fan @viirya |
Test build #127948 has finished for PR 29535 at commit
|
private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] = | ||
private def lookupV2Relation( | ||
identifier: Seq[String], | ||
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need the default value?
private def lookupRelation(identifier: Seq[String]): Option[LogicalPlan] = { | ||
private def lookupRelation( | ||
identifier: Seq[String], | ||
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -40,9 +41,12 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str | |||
* Holds the name of a relation that has yet to be looked up in a catalog. | |||
* | |||
* @param multipartIdentifier table name | |||
* @param options options for this relation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
options to scan this relation. Only applicable to v2 table scan.
LGTM except 2 minor comments |
Test build #127959 has finished for PR 29535 at commit
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, we don't have test for this change?
Test build #127961 has finished for PR 29535 at commit
|
it will be tested in the JDBC v2 PR, but yea, better to have a UT here with a fake v2 source. |
@huaxingao You may need to sync with master. |
a50bf2f
to
dcc88ca
Compare
Test build #127962 has finished for PR 29535 at commit
|
@@ -40,7 +41,6 @@ import org.apache.spark.sql.execution.datasources.csv._ | |||
import org.apache.spark.sql.execution.datasources.jdbc._ | |||
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource | |||
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils} | |||
import org.apache.spark.sql.internal.SQLConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, @huaxingao, don't remove this. Other change uses SQLConf
.
Test build #127965 has finished for PR 29535 at commit
|
We need to fix |
For this explain formatted
Now it has
Do we just need to change the expected output to let the test pass? Or we need to fix the code to make the explain string to have more meaningful info than |
@huaxingao good point. I think we should fix |
I didn't run the whole sql test suites on my local. I will just take chance and test this on Jenkins. If the change causes other problems, I will fix tomorrow. |
Test build #127988 has finished for PR 29535 at commit
|
@@ -544,6 +545,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { | |||
case None => Nil | |||
case Some(null) => Nil | |||
case Some(any) => any :: Nil | |||
case map: CaseInsensitiveStringMap => | |||
truncatedString(map.entrySet().toArray(), "[", ", ", "]", maxFields) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure this returns meaningful string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and we should at least display the original map (keys are not lowercased)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use asCaseSensitiveMap
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried the following
scala> Map("a" -> 1).asJava.entrySet.toArray.head
res1: Object = scala.collection.convert.Wrappers$MapWrapper$$anon$1$$anon$5$$anon$6@60
I think it's safer to generate the string key=value
manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this:
scala> val options = Map(
| "Key1" -> "Value1",
| "key2" -> "vaue2"
| )
options: scala.collection.immutable.Map[String,String] = Map(Key1 -> Value1, key2 -> vaue2)
scala> val map = new CaseInsensitiveStringMap(options.asJava)
map: org.apache.spark.sql.util.CaseInsensitiveStringMap = org.apache.spark.sql.util.CaseInsensitiveStringMap@9f174175
scala> map.asCaseSensitiveMap().entrySet().toArray()
res3: Array[Object] = Array(Key1=Value1, key2=vaue2)
I used case map: CaseInsensitiveStringMap
for pattern matching, I guess we are probably OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test case in ExplainSuite
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will do. Have a good night!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we can't have a UnresolvedRelation
with a non empty CaseInsensitiveStringMap
in explain? The only way to create a UnresolvedRelation
with a non empty CaseInsensitiveStringMap
is using DataFrameReader.table
, but this is resolved to a V2Relation
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
df.explain(true)
will explain the parsed plan, which contains UnresolvedRelation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a test. Thank you!
Test build #128077 has finished for PR 29535 at commit
|
Test build #128078 has finished for PR 29535 at commit
|
retest this please |
Test build #128084 has finished for PR 29535 at commit
|
thanks, merging to master! |
Thank you very much! @cloud-fan @viirya |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm too.
wait, I get confused here. We already defined a table with options. How does it work with the newly set options? are they merged? |
it's table properties vs scan options |
So |
this creates a myth that setting |
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
Show resolved
Hide resolved
This is expected. Per-scan options have higher priority than table properties. |
@@ -40,9 +41,12 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str | |||
* Holds the name of a relation that has yet to be looked up in a catalog. | |||
* | |||
* @param multipartIdentifier table name | |||
* @param options options to scan this relation. Only applicable to v2 table scan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I just noticed 5e82548 added the merging behaviour for V1.
Okay, maybe we should fix the comments here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will have a followup to fix the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opened a PR #34075
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I created a PR without seeing your comment. Thanks @huaxingao.
…ation ### What changes were proposed in this pull request? This PR fixes the 'options' description on `UnresolvedRelation`. This comment was added in #29535 but not valid anymore because V1 also uses this `options` (and merge the options with the table properties) per #29712. This PR can go through from `master` to `branch-3.1`. ### Why are the changes needed? To make `UnresolvedRelation.options`'s description clearer. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Scala linter by `dev/linter-scala`. Closes #34075 from HyukjinKwon/minor-comment-unresolved-releation. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Huaxin Gao <huaxin_gao@apple.com>
…ation ### What changes were proposed in this pull request? This PR fixes the 'options' description on `UnresolvedRelation`. This comment was added in #29535 but not valid anymore because V1 also uses this `options` (and merge the options with the table properties) per #29712. This PR can go through from `master` to `branch-3.1`. ### Why are the changes needed? To make `UnresolvedRelation.options`'s description clearer. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Scala linter by `dev/linter-scala`. Closes #34075 from HyukjinKwon/minor-comment-unresolved-releation. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Huaxin Gao <huaxin_gao@apple.com> (cherry picked from commit 0076eba) Signed-off-by: Huaxin Gao <huaxin_gao@apple.com>
…ation ### What changes were proposed in this pull request? This PR fixes the 'options' description on `UnresolvedRelation`. This comment was added in #29535 but not valid anymore because V1 also uses this `options` (and merge the options with the table properties) per #29712. This PR can go through from `master` to `branch-3.1`. ### Why are the changes needed? To make `UnresolvedRelation.options`'s description clearer. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Scala linter by `dev/linter-scala`. Closes #34075 from HyukjinKwon/minor-comment-unresolved-releation. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Huaxin Gao <huaxin_gao@apple.com> (cherry picked from commit 0076eba) Signed-off-by: Huaxin Gao <huaxin_gao@apple.com>
What changes were proposed in this pull request?
pass specified options in DataFrameReader.table to JDBCTableCatalog.loadTable
Why are the changes needed?
Currently,
DataFrameReader.table
ignores the specified options. The options specified like the following are lost.We need to make
DataFrameReader.table
take the specified options.Does this PR introduce any user-facing change?
No
How was this patch tested?
Manually test for now. Will add a test after V2 JDBC read is implemented.