-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-18538] [SQL] Fix Concurrent Table Fetching Using DataFrameReader JDBC APIs #15975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #68981 has started for PR 15975 at commit |
@@ -159,7 +159,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { | |||
* @since 1.4.0 | |||
*/ | |||
def jdbc(url: String, table: String, properties: Properties): DataFrame = { | |||
jdbc(url, table, JDBCRelation.columnPartition(null), properties) | |||
// connectionProperties should override settings in extraOptions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no connectionProperties
here but only properties
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
// credentials should not be included in the plan output, table information is sufficient. | ||
s"JDBCRelation(${jdbcOptions.table})" | ||
s"JDBCRelation(${jdbcOptions.table})" + partitioningInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If parts is empty, this string looks weird like "JDBCRelation(.....)()" as I tried locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
@@ -404,6 +425,7 @@ class JDBCSuite extends SparkFunSuite | |||
numPartitions = 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, is this 0
numPartitions valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the read path, YES.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix this after your PR #15966 is merged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's merged, has it been fixed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
: ) Let me move the value check to the write path, and then, we can keep the existing behavior of numPartitions
in the read path.
Hi, @gatorsmile . |
Sure, will add the test cases for the write path. @dongjoon-hyun |
The test cases of the write path will be added after #15966 is merged. |
Test build #69114 has started for PR 15975 at commit |
retest this please |
Test build #69139 has finished for PR 15975 at commit
|
@gatorsmile did you update this? |
Will update it tonight. |
@dongjoon-hyun Will not add test cases for the write path in this PR, because it requires code changes on the source codes. Will submit a separate PR after this is merged. |
@gatorsmile NP. Thank you for informing that. |
Test build #69180 has finished for PR 15975 at commit
|
Any more comment? : ) Also cc @cloud-fan @hvanhovell |
private def jdbc( | ||
url: String, | ||
table: String, | ||
parts: Array[Partition], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this parameter is never used, when did we introduce it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduced in the PR #15499 which was merged to 2.1 only
this.extraOptions += ("url" -> url, "dbtable" -> table) | ||
format("jdbc").load() | ||
val relation = JDBCRelation(parts, options)(sparkSession) | ||
sparkSession.baseRelationToDataFrame(relation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we have 2 code path for jdbc? The API with predicates
parameter is kind of an advanced API that allows users to customize the partitions, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. The predicate-based API is very useful for the advanced JDBC users.
// properties should override settings in extraOptions. | ||
this.extraOptions = this.extraOptions ++ properties.asScala | ||
// explicit url and dbtable should override all | ||
this.extraOptions += ("url" -> url, "dbtable" -> table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use the constant JDBCOptions.xxx
LGTM except https://github.com/apache/spark/pull/15975/files#r89722356, what's the status of it? |
Test build #69253 has finished for PR 15975 at commit
|
df | ||
} | ||
val repartitionedDF = options.numPartitions match { | ||
case Some(n) if n <= 0 => throw new IllegalArgumentException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this check is only in write path now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah.
def checkNumPartitions(df: DataFrame, expectedNumPartitions: Int): Unit = { | ||
val explain = ExplainCommand(df.queryExecution.logical, extended = true) | ||
val plans = spark.sessionState.executePlan(explain).executedPlan | ||
val expectedMsg = s"${JDBCOptions.JDBC_NUM_PARTITIONS}=$expectedNumPartitions" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of matching string, can we call collect
on the logical plan and get the JDBCRelation
to check the partitions directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea!
@@ -76,9 +76,6 @@ class JDBCOptions( | |||
|
|||
// the number of partitions | |||
val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the behaviour of this config is set to 0 or negative for read path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading the table using a single partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have we documented this behaviour?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet. : ) Will try to document it in the jdbc API of DataFrameReader.scala
retest this please |
Test build #69392 has started for PR 15975 at commit |
@@ -209,6 +209,16 @@ class JDBCSuite extends SparkFunSuite | |||
conn.close() | |||
} | |||
|
|||
// Check whether the tables are fetched in the expected degree of parallelism | |||
def checkNumPartitions(df: DataFrame, expectedNumPartitions: Int): Unit = { | |||
df.queryExecution.analyzed.collectFirst { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
val jdbcRelations = df.queryExecution....collect {
case LogicalRelation(r: JDBCRelation, _, _) => r
}
assert(jdbcRelations.length == 1)
assert(jdbcRelations.head.parts.length == expected, "error message")
Test build #69430 has finished for PR 15975 at commit
|
retest this please |
Test build #69444 has finished for PR 15975 at commit
|
thanks, merging to master! Since #15868 is not backported to 2.1, this PR conflicts with 2.1, @gatorsmile can you send a backport PR? thanks. |
Sure, will do it soon. |
…DataFrameReader JDBC APIs ### What changes were proposed in this pull request? #### This PR is to backport #15975 to Branch 2.1 --- The following two `DataFrameReader` JDBC APIs ignore the user-specified parameters of parallelism degree. ```Scala def jdbc( url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame ``` ```Scala def jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame ``` This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of `EXPLAIN` command by adding `numPartitions` in the `JDBCRelation` node. Before the fix, ``` == Physical Plan == *Scan JDBCRelation(TEST.PEOPLE) [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int> ``` After the fix, ``` == Physical Plan == *Scan JDBCRelation(TEST.PEOPLE) [numPartitions=3] [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int> ``` ### How was this patch tested? Added the verification logics on all the test cases for JDBC concurrent fetching. Author: gatorsmile <gatorsmile@gmail.com> Closes #16111 from gatorsmile/jdbcFix2.1.
…r JDBC APIs ### What changes were proposed in this pull request? The following two `DataFrameReader` JDBC APIs ignore the user-specified parameters of parallelism degree. ```Scala def jdbc( url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame ``` ```Scala def jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame ``` This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of `EXPLAIN` command by adding `numPartitions` in the `JDBCRelation` node. Before the fix, ``` == Physical Plan == *Scan JDBCRelation(TEST.PEOPLE) [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int> ``` After the fix, ``` == Physical Plan == *Scan JDBCRelation(TEST.PEOPLE) [numPartitions=3] [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int> ``` ### How was this patch tested? Added the verification logics on all the test cases for JDBC concurrent fetching. Author: gatorsmile <gatorsmile@gmail.com> Closes apache#15975 from gatorsmile/jdbc.
…r JDBC APIs ### What changes were proposed in this pull request? The following two `DataFrameReader` JDBC APIs ignore the user-specified parameters of parallelism degree. ```Scala def jdbc( url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame ``` ```Scala def jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame ``` This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of `EXPLAIN` command by adding `numPartitions` in the `JDBCRelation` node. Before the fix, ``` == Physical Plan == *Scan JDBCRelation(TEST.PEOPLE) [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int> ``` After the fix, ``` == Physical Plan == *Scan JDBCRelation(TEST.PEOPLE) [numPartitions=3] [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int> ``` ### How was this patch tested? Added the verification logics on all the test cases for JDBC concurrent fetching. Author: gatorsmile <gatorsmile@gmail.com> Closes apache#15975 from gatorsmile/jdbc.
What changes were proposed in this pull request?
The following two
DataFrameReader
JDBC APIs ignore the user-specified parameters of parallelism degree.This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of
EXPLAIN
command by addingnumPartitions
in theJDBCRelation
node.Before the fix,
After the fix,
How was this patch tested?
Added the verification logics on all the test cases for JDBC concurrent fetching.