Skip to content

[SPARK-18413][SQL] Add maxConnections JDBCOption #15868

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed

[SPARK-18413][SQL] Add maxConnections JDBCOption #15868

wants to merge 7 commits into from

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Nov 13, 2016

What changes were proposed in this pull request?

This PR adds a new JDBCOption maxConnections which means the maximum number of simultaneous JDBC connections allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. Previously, SQL users cannot cannot control this while Scala/Java/Python users can use coalesce (or repartition) API.

Reported Scenario

For the following cases, the number of connections becomes 200 and database cannot handle all of them.

CREATE OR REPLACE TEMPORARY VIEW resultview
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:oracle:thin:@10.129.10.111:1521:BKDB",
  dbtable "result",
  user "HIVE",
  password "HIVE"
);
-- set spark.sql.shuffle.partitions=200
INSERT OVERWRITE TABLE resultview SELECT g, count(1) AS COUNT FROM tnet.DT_LIVE_INFO GROUP BY g

How was this patch tested?

Manual. Do the followings and see Spark UI.

Step 1 (MySQL)

CREATE TABLE t1 (a INT);
CREATE TABLE data (a INT);
INSERT INTO data VALUES (1);
INSERT INTO data VALUES (2);
INSERT INTO data VALUES (3);

Step 2 (Spark)

SPARK_HOME=$PWD bin/spark-shell --driver-memory 4G --driver-class-path mysql-connector-java-5.1.40-bin.jar
scala> sql("SET spark.sql.shuffle.partitions=3")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW data USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 'data', user 'root', password '')")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '1')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '3')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '4')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")

maxconnections

@SparkQA
Copy link

SparkQA commented Nov 13, 2016

Test build #68577 has finished for PR 15868 at commit e29974a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lichenglin
Copy link

I'm sorry,my network is too bad to download dependencies from maven rep for building spark.
I have seen your PR and here is some suggestions:

  1. I notice the PR has used CaseInsensitiveMap,becasuse "numPartitions" is still used by DataframeReader,we'd better to check whether jdbcreading work well.

2."Testing the Total Connections" may be difficulty,because when the tasks finish the connection will be closed.I have better way to check the numPartitions if works, we can watch the last stage's task on spark ui,there should be "numPartitions" task in the last step.

@dongjoon-hyun
Copy link
Member Author

Thank you for review, @lichenglin .

For 1, Jenkins test that, too.
For 2, sure! It sounds reasonable and better.

getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel)
)
if (options.numPartitions != null && options.numPartitions.toInt != df.rdd.getNumPartitions) {
df.repartition(options.numPartitions.toInt).foreachPartition(iterator => savePartition(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This repeats the foreachPartition part twice, when that could be outside the if-else.

I don't know enough to say whether we should add this property. It seems a little funny to set this globally to apply to all dataframes written to a database. I understand the use case is pure SQL, where perhaps that's the only option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @srowen .

First, the property numPartitions already exists in JDBCOptions.scala: Optional parameters only for reading.
This PR makes that option meaningful during write operation.

Second, for dataframe usecases, we can call repartition before saving to manage this. Actually, I asked @lichenglin that way. But, the main purpose of issue requested by @lichenglin is about providing pure SQL way to control the number of partitions for writing. In SQL, this looks reasonable to me.

@dongjoon-hyun
Copy link
Member Author

Hi, @srowen .
I addressed the comment and added an image in the PR description.

@@ -70,6 +70,9 @@ class JDBCOptions(
}
}

// the number of partitions
val numPartitions = parameters.getOrElse(JDBC_NUM_PARTITIONS, null)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just reposition due to the grouping comment.

} else {
df
}
repartitionedDF.foreachPartition(iterator => savePartition(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, foreachPartition is outside of if..else...

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still feel like I'd like someone else to comment on whether the property makes sense, but the change itself looks reasonable.

df.foreachPartition(iterator => savePartition(
val numPartitions = options.numPartitions
val repartitionedDF = if (numPartitions != null &&
numPartitions.toInt != df.rdd.getNumPartitions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny style point -- breaking the if statement that way looks a little funny to my eyes. I might do ...

val repartitionedDF =
  if (...) {
    ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

@dongjoon-hyun
Copy link
Member Author

Then, @gatorsmile .
Could you review this PR, too?

@SparkQA
Copy link

SparkQA commented Nov 14, 2016

Test build #68630 has finished for PR 15868 at commit c926012.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -70,6 +70,9 @@ class JDBCOptions(
}
}

// the number of partitions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not clear. The document needs an update.

http://spark.apache.org/docs/latest/sql-programming-guide.html

df.foreachPartition(iterator => savePartition(
val numPartitions = options.numPartitions
val repartitionedDF =
if (numPartitions != null && numPartitions.toInt != df.rdd.getNumPartitions) {
Copy link
Member

@gatorsmile gatorsmile Nov 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, based on my understanding, users only care the maximal number of connections. Thus, no need to repartition it when numPartitions.toInt >= df.rdd.getNumPartitions, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increasing the number of partitions can improve the insert performance in some scenarios, I think. However, repartition is not cheap.

@gatorsmile
Copy link
Member

numPartitions might be not a good name for this purpose. How about maxConnections?

@SparkQA
Copy link

SparkQA commented Nov 14, 2016

Test build #68634 has finished for PR 15868 at commit 93916b1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val numPartitions = options.numPartitions
val repartitionedDF =
if (numPartitions != null && numPartitions.toInt != df.rdd.getNumPartitions) {
df.repartition(numPartitions.toInt)
Copy link
Member

@gatorsmile gatorsmile Nov 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repartition is really expensive. Is that ok to use coalesce here?

@dongjoon-hyun
Copy link
Member Author

Thank you, @gatorsmile ! I'll update this soon.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-18413][SQL] Control the number of JDBC connections by repartition with numPartition JDBCOption [SPARK-18413][SQL] Add maxConnections JDBCOption Nov 15, 2016
@dongjoon-hyun
Copy link
Member Author

@gatorsmile , I addressed all comments.

@SparkQA
Copy link

SparkQA commented Nov 15, 2016

Test build #68641 has finished for PR 15868 at commit 3378b5e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

retest this please

@dongjoon-hyun
Copy link
Member Author

Thank you for retriggering, @gatorsmile .

@SparkQA
Copy link

SparkQA commented Nov 15, 2016

Test build #68647 has finished for PR 15868 at commit 3378b5e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @srowen and @gatorsmile .
How do you think about the updated PR?

@@ -1087,6 +1087,13 @@ the following case-sensitive options:
</tr>

<tr>
<td><code>maxConnection</code></td>
<td>
The maximum number of JDBC connections, which determines how many connections to occur. This can reduce the change of bottleneck on JDBC servers. This option applies only to writing. It defaults to the number of partitions of RDD.
Copy link
Member

@gatorsmile gatorsmile Nov 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

The number of JDBC connections, which specifies the maximum number of simultaneous JDBC connections that are allowed.

This can reduce the change of bottleneck on JDBC servers. What does this mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. I tried to mean the chance of bottlenecks.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Nov 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix like that.

@SparkQA
Copy link

SparkQA commented Nov 16, 2016

Test build #68672 has finished for PR 15868 at commit 27c27aa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -358,7 +358,8 @@ case class DataSource(

providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, options, data)
dataSource.createRelation(
sparkSession.sqlContext, mode, new CaseInsensitiveMap(options), data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep this unchanged?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. It's included in #15884 . After that, I can remove this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this.

@gatorsmile
Copy link
Member

Could we have a test case for this new parameter?

df.foreachPartition(iterator => savePartition(
val maxConnections = options.maxConnections
val repartitionedDF =
if (maxConnections != null && maxConnections.toInt < df.rdd.getNumPartitions) {
Copy link
Member

@gatorsmile gatorsmile Nov 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the input is less than 1, we should detect it at the beginning, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I'll add require like batchsize.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's done.

@@ -1087,6 +1087,13 @@ the following case-sensitive options:
</tr>

<tr>
<td><code>maxConnections</code></td>
<td>
The number of JDBC connections, which specifies the maximum number of simultaneous JDBC connections that are allowed. This option applies only to writing. It defaults to the number of partitions of RDD.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the minimal value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the end of the description, I can add it. But, this will look different with other options in terms of level of depth. Is it okay?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Let us keep it unchanged.

@SparkQA
Copy link

SparkQA commented Nov 16, 2016

Test build #68696 has finished for PR 15868 at commit ba8f600.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

It seems not a test failure.

Traceback (most recent call last):
  File "./dev/run-tests-jenkins.py", line 232, in <module>
    main()
  File "./dev/run-tests-jenkins.py", line 219, in main
    test_result_code, test_result_note = run_tests(tests_timeout)
  File "./dev/run-tests-jenkins.py", line 140, in run_tests
    test_result_note = ' * This patch **fails %s**.' % failure_note_by_errcode[test_result_code]
KeyError: -9

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Nov 21, 2016

Test build #68929 has finished for PR 15868 at commit dc152f4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @cloud-fan .
If there is something to do more, please let me know again.
Thank you so much.

@cloud-fan
Copy link
Contributor

LGTM, waiting @srowen for final sign off

@srowen
Copy link
Member

srowen commented Nov 21, 2016

Merged to master

@asfgit asfgit closed this in 07beb5d Nov 21, 2016
@dongjoon-hyun
Copy link
Member Author

Thank you for review and merging, @srowen , @gatorsmile , @cloud-fan , @lichenglin !

@rxin
Copy link
Contributor

rxin commented Nov 21, 2016

Hm - wouldn't a more general solution be to introduce a parallelism hint or command (basically the coalesce equivalent) in SQL?

@dongjoon-hyun
Copy link
Member Author

Hi, @rxin .
Do you mean changing SQL syntax like BROADCAST HINT PR we tried before?

@rxin
Copy link
Contributor

rxin commented Nov 21, 2016

Yes - otherwise it can get pretty confusing.

I'd argue the user has already explicitly defined the degree of parallelism in the JDBC source using other options. Having this maxConnections option doing "coalesce" is pretty weird.

@dongjoon-hyun
Copy link
Member Author

Could you give the example of existing method you mention?

the user has already explicitly defined the degree of parallelism in the JDBC source using other options.

@rxin
Copy link
Contributor

rxin commented Nov 21, 2016

Don't they already specify "numPartitions" ?

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Nov 21, 2016

It's read-only option. At the first commit, I reuse that option for writing. But, as you see, it changed according the review advices.

@dongjoon-hyun
Copy link
Member Author

Let me find the correct code. (It's weird I attached a wrong code example.)

@rxin
Copy link
Contributor

rxin commented Nov 21, 2016

Got it - so this is to enforce the write side: in the future please make it clear this is for the write side.

Coming back to the discussion: shouldn't we just use numPartitions, given it is already there? Why pick another one to confuse users?

@dongjoon-hyun
Copy link
Member Author

The code seems to be gone during resolving conflicts. This is the my comment at the first review.

Thank you for review, @srowen .

First, the property numPartitions already exists in JDBCOptions.scala: Optional parameters only for reading.
This PR makes that option meaningful during write operation.

@rxin
Copy link
Contributor

rxin commented Nov 21, 2016

Basically right now if I were to define a table, I'd need to define both the read and the write side, using numPartitions and maxPartitions. Even if we want to have two separate configs for read vs write, it is absolutely not clear which one is which.

If I were to just hand you two config options: numPartitions and maxPartitions. Would you be able to tell me why one is for read and the other is for write?

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Nov 21, 2016

The rational is to give a name which has a clear meaning. Also, I agreed with the following advice.

numPartitions might be not a good name for this purpose. How about maxConnections?

If you want to merge them together into numPartitions, I'll do as a follow-up PR.

@gatorsmile
Copy link
Member

gatorsmile commented Nov 21, 2016

My fault. In my initial thoughts, numPartitions does not indicate the max allowed number of JDBC connections, and the write path does not always create the exact number of partitions. We only reduce the num of partitions if the number is larger than the threshold. This is different from the read path.

I do not have a strong opinon on either way.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Nov 21, 2016

Then, @rxin and @gatorsmile .
I'll make a followup PR to merge maxConnection into numPartitions.

ghost pushed a commit to dbtsai/spark that referenced this pull request Nov 25, 2016
…ections`

## What changes were proposed in this pull request?

This is a follow-up PR of apache#15868 to merge `maxConnections` option into `numPartitions` options.

## How was this patch tested?

Pass the existing tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#15966 from dongjoon-hyun/SPARK-18413-2.
@dongjoon-hyun dongjoon-hyun deleted the SPARK-18413 branch November 27, 2016 07:21
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
…ections`

## What changes were proposed in this pull request?

This is a follow-up PR of apache#15868 to merge `maxConnections` option into `numPartitions` options.

## How was this patch tested?

Pass the existing tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#15966 from dongjoon-hyun/SPARK-18413-2.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

This PR adds a new JDBCOption `maxConnections` which means the maximum number of simultaneous JDBC connections allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. Previously, SQL users cannot cannot control this while Scala/Java/Python users can use `coalesce` (or `repartition`) API.

**Reported Scenario**

For the following cases, the number of connections becomes 200 and database cannot handle all of them.

```sql
CREATE OR REPLACE TEMPORARY VIEW resultview
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:oracle:thin:10.129.10.111:1521:BKDB",
  dbtable "result",
  user "HIVE",
  password "HIVE"
);
-- set spark.sql.shuffle.partitions=200
INSERT OVERWRITE TABLE resultview SELECT g, count(1) AS COUNT FROM tnet.DT_LIVE_INFO GROUP BY g
```

## How was this patch tested?

Manual. Do the followings and see Spark UI.

**Step 1 (MySQL)**
```
CREATE TABLE t1 (a INT);
CREATE TABLE data (a INT);
INSERT INTO data VALUES (1);
INSERT INTO data VALUES (2);
INSERT INTO data VALUES (3);
```

**Step 2 (Spark)**
```scala
SPARK_HOME=$PWD bin/spark-shell --driver-memory 4G --driver-class-path mysql-connector-java-5.1.40-bin.jar
scala> sql("SET spark.sql.shuffle.partitions=3")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW data USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 'data', user 'root', password '')")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '1')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '3')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '4')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
```

![maxconnections](https://cloud.githubusercontent.com/assets/9700541/20287987/ed8409c2-aa84-11e6-8aab-ae28e63fe54d.png)

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#15868 from dongjoon-hyun/SPARK-18413.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…ections`

## What changes were proposed in this pull request?

This is a follow-up PR of apache#15868 to merge `maxConnections` option into `numPartitions` options.

## How was this patch tested?

Pass the existing tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#15966 from dongjoon-hyun/SPARK-18413-2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants