Skip to content

[SPARK-18538] [SQL] Fix Concurrent Table Fetching Using DataFrameReader JDBC APIs #15975

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from

Conversation

gatorsmile
Copy link
Member

What changes were proposed in this pull request?

The following two DataFrameReader JDBC APIs ignore the user-specified parameters of parallelism degree.

  def jdbc(
      url: String,
      table: String,
      columnName: String,
      lowerBound: Long,
      upperBound: Long,
      numPartitions: Int,
      connectionProperties: Properties): DataFrame
  def jdbc(
      url: String,
      table: String,
      predicates: Array[String],
      connectionProperties: Properties): DataFrame

This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of EXPLAIN command by adding numPartitions in the JDBCRelation node.

Before the fix,

== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>

After the fix,

== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [numPartitions=3] [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>

How was this patch tested?

Added the verification logics on all the test cases for JDBC concurrent fetching.

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #68981 has started for PR 15975 at commit bcc86c0.

@gatorsmile gatorsmile changed the title Fix Concurrent Table Fetching Using DataFrameReader JDBC APIs [SPARK-18538] [SQL] Fix Concurrent Table Fetching Using DataFrameReader JDBC APIs Nov 22, 2016
@@ -159,7 +159,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), properties)
// connectionProperties should override settings in extraOptions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no connectionProperties here but only properties.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

// credentials should not be included in the plan output, table information is sufficient.
s"JDBCRelation(${jdbcOptions.table})"
s"JDBCRelation(${jdbcOptions.table})" + partitioningInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If parts is empty, this string looks weird like "JDBCRelation(.....)()" as I tried locally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks!

@@ -404,6 +425,7 @@ class JDBCSuite extends SparkFunSuite
numPartitions = 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, is this 0 numPartitions valid?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the read path, YES.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix this after your PR #15966 is merged

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's merged, has it been fixed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: ) Let me move the value check to the write path, and then, we can keep the existing behavior of numPartitions in the read path.

@dongjoon-hyun
Copy link
Member

Hi, @gatorsmile .
Can we can add a testcase for writing partition here? (According to the PR title, it's beyond of the scope.)

@gatorsmile
Copy link
Member Author

Sure, will add the test cases for the write path. @dongjoon-hyun

@gatorsmile
Copy link
Member Author

The test cases of the write path will be added after #15966 is merged.

@SparkQA
Copy link

SparkQA commented Nov 24, 2016

Test build #69114 has started for PR 15975 at commit 5c5b3ca.

@gatorsmile
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 24, 2016

Test build #69139 has finished for PR 15975 at commit 5c5b3ca.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Nov 25, 2016

@gatorsmile did you update this?

@gatorsmile
Copy link
Member Author

Will update it tonight.

@gatorsmile
Copy link
Member Author

gatorsmile commented Nov 26, 2016

@dongjoon-hyun Will not add test cases for the write path in this PR, because it requires code changes on the source codes. Will submit a separate PR after this is merged.

@dongjoon-hyun
Copy link
Member

@gatorsmile NP. Thank you for informing that.

@SparkQA
Copy link

SparkQA commented Nov 26, 2016

Test build #69180 has finished for PR 15975 at commit 1b0caea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

Any more comment? : )

Also cc @cloud-fan @hvanhovell

private def jdbc(
url: String,
table: String,
parts: Array[Partition],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this parameter is never used, when did we introduce it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced in the PR #15499 which was merged to 2.1 only

this.extraOptions += ("url" -> url, "dbtable" -> table)
format("jdbc").load()
val relation = JDBCRelation(parts, options)(sparkSession)
sparkSession.baseRelationToDataFrame(relation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we have 2 code path for jdbc? The API with predicates parameter is kind of an advanced API that allows users to customize the partitions, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The predicate-based API is very useful for the advanced JDBC users.

// properties should override settings in extraOptions.
this.extraOptions = this.extraOptions ++ properties.asScala
// explicit url and dbtable should override all
this.extraOptions += ("url" -> url, "dbtable" -> table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use the constant JDBCOptions.xxx

@cloud-fan
Copy link
Contributor

LGTM except https://github.com/apache/spark/pull/15975/files#r89722356, what's the status of it?

@SparkQA
Copy link

SparkQA commented Nov 28, 2016

Test build #69253 has finished for PR 15975 at commit c29199c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

df
}
val repartitionedDF = options.numPartitions match {
case Some(n) if n <= 0 => throw new IllegalArgumentException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this check is only in write path now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.

def checkNumPartitions(df: DataFrame, expectedNumPartitions: Int): Unit = {
val explain = ExplainCommand(df.queryExecution.logical, extended = true)
val plans = spark.sessionState.executePlan(explain).executedPlan
val expectedMsg = s"${JDBCOptions.JDBC_NUM_PARTITIONS}=$expectedNumPartitions"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of matching string, can we call collect on the logical plan and get the JDBCRelation to check the partitions directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

@@ -76,9 +76,6 @@ class JDBCOptions(

// the number of partitions
val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the behaviour of this config is set to 0 or negative for read path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the table using a single partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we documented this behaviour?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. : ) Will try to document it in the jdbc API of DataFrameReader.scala

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69392 has started for PR 15975 at commit 404aa22.

@@ -209,6 +209,16 @@ class JDBCSuite extends SparkFunSuite
conn.close()
}

// Check whether the tables are fetched in the expected degree of parallelism
def checkNumPartitions(df: DataFrame, expectedNumPartitions: Int): Unit = {
df.queryExecution.analyzed.collectFirst {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

val jdbcRelations = df.queryExecution....collect {
  case LogicalRelation(r: JDBCRelation, _, _) => r
}
assert(jdbcRelations.length == 1)
assert(jdbcRelations.head.parts.length == expected, "error message")

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69430 has finished for PR 15975 at commit 728c103.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #69444 has finished for PR 15975 at commit 728c103.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in b28fe4a Dec 1, 2016
@cloud-fan
Copy link
Contributor

thanks, merging to master!

Since #15868 is not backported to 2.1, this PR conflicts with 2.1, @gatorsmile can you send a backport PR? thanks.

@gatorsmile
Copy link
Member Author

Sure, will do it soon.

asfgit pushed a commit that referenced this pull request Dec 2, 2016
…DataFrameReader JDBC APIs

### What changes were proposed in this pull request?

#### This PR is to backport #15975 to Branch 2.1

---

The following two `DataFrameReader` JDBC APIs ignore the user-specified parameters of parallelism degree.

```Scala
  def jdbc(
      url: String,
      table: String,
      columnName: String,
      lowerBound: Long,
      upperBound: Long,
      numPartitions: Int,
      connectionProperties: Properties): DataFrame
```

```Scala
  def jdbc(
      url: String,
      table: String,
      predicates: Array[String],
      connectionProperties: Properties): DataFrame
```

This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of `EXPLAIN` command by adding `numPartitions` in the `JDBCRelation` node.

Before the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```

After the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [numPartitions=3] [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```
### How was this patch tested?
Added the verification logics on all the test cases for JDBC concurrent fetching.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16111 from gatorsmile/jdbcFix2.1.
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
…r JDBC APIs

### What changes were proposed in this pull request?
The following two `DataFrameReader` JDBC APIs ignore the user-specified parameters of parallelism degree.

```Scala
  def jdbc(
      url: String,
      table: String,
      columnName: String,
      lowerBound: Long,
      upperBound: Long,
      numPartitions: Int,
      connectionProperties: Properties): DataFrame
```

```Scala
  def jdbc(
      url: String,
      table: String,
      predicates: Array[String],
      connectionProperties: Properties): DataFrame
```

This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of `EXPLAIN` command by adding `numPartitions` in the `JDBCRelation` node.

Before the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```

After the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [numPartitions=3] [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```
### How was this patch tested?
Added the verification logics on all the test cases for JDBC concurrent fetching.

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#15975 from gatorsmile/jdbc.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…r JDBC APIs

### What changes were proposed in this pull request?
The following two `DataFrameReader` JDBC APIs ignore the user-specified parameters of parallelism degree.

```Scala
  def jdbc(
      url: String,
      table: String,
      columnName: String,
      lowerBound: Long,
      upperBound: Long,
      numPartitions: Int,
      connectionProperties: Properties): DataFrame
```

```Scala
  def jdbc(
      url: String,
      table: String,
      predicates: Array[String],
      connectionProperties: Properties): DataFrame
```

This PR is to fix the issues. To verify the behavior correctness, we improve the plan output of `EXPLAIN` command by adding `numPartitions` in the `JDBCRelation` node.

Before the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```

After the fix,
```
== Physical Plan ==
*Scan JDBCRelation(TEST.PEOPLE) [numPartitions=3] [NAME#1896,THEID#1897] ReadSchema: struct<NAME:string,THEID:int>
```
### How was this patch tested?
Added the verification logics on all the test cases for JDBC concurrent fetching.

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#15975 from gatorsmile/jdbc.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants