[SPARK-18538] [SQL] Fix Concurrent Table Fetching Using DataFrameReader JDBC APIs#15975
[SPARK-18538] [SQL] Fix Concurrent Table Fetching Using DataFrameReader JDBC APIs#15975gatorsmile wants to merge 10 commits intoapache:masterfrom
Conversation
|
Test build #68981 has started for PR 15975 at commit |
| */ | ||
| def jdbc(url: String, table: String, properties: Properties): DataFrame = { | ||
| jdbc(url, table, JDBCRelation.columnPartition(null), properties) | ||
| // connectionProperties should override settings in extraOptions. |
There was a problem hiding this comment.
nit: no connectionProperties here but only properties.
| val partitioningInfo = if (parts.nonEmpty) s" [numPartitions=${parts.length}]" | ||
| // credentials should not be included in the plan output, table information is sufficient. | ||
| s"JDBCRelation(${jdbcOptions.table})" | ||
| s"JDBCRelation(${jdbcOptions.table})" + partitioningInfo |
There was a problem hiding this comment.
If parts is empty, this string looks weird like "JDBCRelation(.....)()" as I tried locally.
| @@ -404,6 +425,7 @@ class JDBCSuite extends SparkFunSuite | |||
| numPartitions = 0, | |||
There was a problem hiding this comment.
Oh, is this 0 numPartitions valid?
There was a problem hiding this comment.
In the read path, YES.
There was a problem hiding this comment.
it's merged, has it been fixed?
There was a problem hiding this comment.
: ) Let me move the value check to the write path, and then, we can keep the existing behavior of numPartitions in the read path.
|
Hi, @gatorsmile . |
|
Sure, will add the test cases for the write path. @dongjoon-hyun |
|
The test cases of the write path will be added after #15966 is merged. |
|
Test build #69114 has started for PR 15975 at commit |
|
retest this please |
|
Test build #69139 has finished for PR 15975 at commit
|
|
@gatorsmile did you update this? |
|
Will update it tonight. |
|
@dongjoon-hyun Will not add test cases for the write path in this PR, because it requires code changes on the source codes. Will submit a separate PR after this is merged. |
|
@gatorsmile NP. Thank you for informing that. |
|
Test build #69180 has finished for PR 15975 at commit
|
|
Any more comment? : ) Also cc @cloud-fan @hvanhovell |
| private def jdbc( | ||
| url: String, | ||
| table: String, | ||
| parts: Array[Partition], |
There was a problem hiding this comment.
this parameter is never used, when did we introduce it?
There was a problem hiding this comment.
Introduced in the PR #15499 which was merged to 2.1 only
| this.extraOptions += ("url" -> url, "dbtable" -> table) | ||
| format("jdbc").load() | ||
| val relation = JDBCRelation(parts, options)(sparkSession) | ||
| sparkSession.baseRelationToDataFrame(relation) |
There was a problem hiding this comment.
so we have 2 code path for jdbc? The API with predicates parameter is kind of an advanced API that allows users to customize the partitions, right?
There was a problem hiding this comment.
Yeah. The predicate-based API is very useful for the advanced JDBC users.
| // properties should override settings in extraOptions. | ||
| this.extraOptions = this.extraOptions ++ properties.asScala | ||
| // explicit url and dbtable should override all | ||
| this.extraOptions += ("url" -> url, "dbtable" -> table) |
There was a problem hiding this comment.
please use the constant JDBCOptions.xxx
|
LGTM except https://github.com/apache/spark/pull/15975/files#r89722356, what's the status of it? |
|
Test build #69253 has finished for PR 15975 at commit
|
| df | ||
| } | ||
| val repartitionedDF = options.numPartitions match { | ||
| case Some(n) if n <= 0 => throw new IllegalArgumentException( |
There was a problem hiding this comment.
so this check is only in write path now?
| def checkNumPartitions(df: DataFrame, expectedNumPartitions: Int): Unit = { | ||
| val explain = ExplainCommand(df.queryExecution.logical, extended = true) | ||
| val plans = spark.sessionState.executePlan(explain).executedPlan | ||
| val expectedMsg = s"${JDBCOptions.JDBC_NUM_PARTITIONS}=$expectedNumPartitions" |
There was a problem hiding this comment.
instead of matching string, can we call collect on the logical plan and get the JDBCRelation to check the partitions directly?
| @@ -76,9 +76,6 @@ class JDBCOptions( | |||
|
|
|||
| // the number of partitions | |||
| val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt) | |||
There was a problem hiding this comment.
what's the behaviour of this config is set to 0 or negative for read path?
There was a problem hiding this comment.
Reading the table using a single partition.
There was a problem hiding this comment.
have we documented this behaviour?
There was a problem hiding this comment.
Not yet. : ) Will try to document it in the jdbc API of DataFrameReader.scala
|
retest this please |
|
Test build #69392 has started for PR 15975 at commit |
|
|
||
| // Check whether the tables are fetched in the expected degree of parallelism | ||
| def checkNumPartitions(df: DataFrame, expectedNumPartitions: Int): Unit = { | ||
| df.queryExecution.analyzed.collectFirst { |
There was a problem hiding this comment.
nit:
val jdbcRelations = df.queryExecution....collect {
case LogicalRelation(r: JDBCRelation, _, _) => r
}
assert(jdbcRelations.length == 1)
assert(jdbcRelations.head.parts.length == expected, "error message")
|
Test build #69430 has finished for PR 15975 at commit
|
|
retest this please |
|
Test build #69444 has finished for PR 15975 at commit
|
|
thanks, merging to master! Since #15868 is not backported to 2.1, this PR conflicts with 2.1, @gatorsmile can you send a backport PR? thanks. |
|
Sure, will do it soon. |
…DataFrameReader JDBC APIs ### What changes were proposed in this pull request? #### This PR is to backport #15975 to Branch 2.1 --- The following two `DataFrameReader` JDBC APIs ignore the user-specified parameters of parallelism degree. ```Scala def jdbc( url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame ``` ```Scala def jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame ``` This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of `EXPLAIN` command by adding `numPartitions` in the `JDBCRelation` node. Before the fix, ``` == Physical Plan == *Scan JDBCRelation(TEST.PEOPLE) [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int> ``` After the fix, ``` == Physical Plan == *Scan JDBCRelation(TEST.PEOPLE) [numPartitions=3] [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int> ``` ### How was this patch tested? Added the verification logics on all the test cases for JDBC concurrent fetching. Author: gatorsmile <gatorsmile@gmail.com> Closes #16111 from gatorsmile/jdbcFix2.1.
…r JDBC APIs
### What changes were proposed in this pull request?
The following two `DataFrameReader` JDBC APIs ignore the user-specified parameters of parallelism degree.
```Scala
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
```
```Scala
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
```
This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of `EXPLAIN` command by adding `numPartitions` in the `JDBCRelation` node.
Before the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```
After the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [numPartitions=3] [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```
### How was this patch tested?
Added the verification logics on all the test cases for JDBC concurrent fetching.
Author: gatorsmile <gatorsmile@gmail.com>
Closes apache#15975 from gatorsmile/jdbc.
…r JDBC APIs
### What changes were proposed in this pull request?
The following two `DataFrameReader` JDBC APIs ignore the user-specified parameters of parallelism degree.
```Scala
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
```
```Scala
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
```
This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of `EXPLAIN` command by adding `numPartitions` in the `JDBCRelation` node.
Before the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```
After the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [numPartitions=3] [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```
### How was this patch tested?
Added the verification logics on all the test cases for JDBC concurrent fetching.
Author: gatorsmile <gatorsmile@gmail.com>
Closes apache#15975 from gatorsmile/jdbc.
What changes were proposed in this pull request?
The following two
DataFrameReaderJDBC APIs ignore the user-specified parameters of parallelism degree.This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of
EXPLAINcommand by addingnumPartitionsin theJDBCRelationnode.Before the fix,
After the fix,
How was this patch tested?
Added the verification logics on all the test cases for JDBC concurrent fetching.