-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-18413][SQL] Add maxConnections
JDBCOption
#15868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #68577 has finished for PR 15868 at commit
|
I'm sorry,my network is too bad to download dependencies from maven rep for building spark.
2."Testing the Total Connections" may be difficulty,because when the tasks finish the connection will be closed.I have better way to check the numPartitions if works, we can watch the last stage's task on spark ui,there should be "numPartitions" task in the last step. |
Thank you for review, @lichenglin . For 1, Jenkins test that, too. |
getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel) | ||
) | ||
if (options.numPartitions != null && options.numPartitions.toInt != df.rdd.getNumPartitions) { | ||
df.repartition(options.numPartitions.toInt).foreachPartition(iterator => savePartition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This repeats the foreachPartition
part twice, when that could be outside the if-else.
I don't know enough to say whether we should add this property. It seems a little funny to set this globally to apply to all dataframes written to a database. I understand the use case is pure SQL, where perhaps that's the only option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review, @srowen .
First, the property numPartitions
already exists in JDBCOptions.scala: Optional parameters only for reading.
This PR makes that option meaningful during write operation.
Second, for dataframe usecases, we can call repartition
before saving to manage this. Actually, I asked @lichenglin that way. But, the main purpose of issue requested by @lichenglin is about providing pure SQL way to control the number of partitions for writing. In SQL, this looks reasonable to me.
Hi, @srowen . |
@@ -70,6 +70,9 @@ class JDBCOptions( | |||
} | |||
} | |||
|
|||
// the number of partitions | |||
val numPartitions = parameters.getOrElse(JDBC_NUM_PARTITIONS, null) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just reposition due to the grouping comment.
} else { | ||
df | ||
} | ||
repartitionedDF.foreachPartition(iterator => savePartition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, foreachPartition
is outside of if..else..
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still feel like I'd like someone else to comment on whether the property makes sense, but the change itself looks reasonable.
df.foreachPartition(iterator => savePartition( | ||
val numPartitions = options.numPartitions | ||
val repartitionedDF = if (numPartitions != null && | ||
numPartitions.toInt != df.rdd.getNumPartitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tiny style point -- breaking the if statement that way looks a little funny to my eyes. I might do ...
val repartitionedDF =
if (...) {
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep!
Then, @gatorsmile . |
Test build #68630 has finished for PR 15868 at commit
|
@@ -70,6 +70,9 @@ class JDBCOptions( | |||
} | |||
} | |||
|
|||
// the number of partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not clear. The document needs an update.
http://spark.apache.org/docs/latest/sql-programming-guide.html
df.foreachPartition(iterator => savePartition( | ||
val numPartitions = options.numPartitions | ||
val repartitionedDF = | ||
if (numPartitions != null && numPartitions.toInt != df.rdd.getNumPartitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally, based on my understanding, users only care the maximal number of connections. Thus, no need to repartition it when numPartitions.toInt >= df.rdd.getNumPartitions
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increasing the number of partitions can improve the insert performance in some scenarios, I think. However, repartition
is not cheap.
|
Test build #68634 has finished for PR 15868 at commit
|
val numPartitions = options.numPartitions | ||
val repartitionedDF = | ||
if (numPartitions != null && numPartitions.toInt != df.rdd.getNumPartitions) { | ||
df.repartition(numPartitions.toInt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
repartition
is really expensive. Is that ok to use coalesce
here?
Thank you, @gatorsmile ! I'll update this soon. |
numPartition
JDBCOptionmaxConnections
JDBCOption
@gatorsmile , I addressed all comments. |
Test build #68641 has finished for PR 15868 at commit
|
retest this please |
Thank you for retriggering, @gatorsmile . |
Test build #68647 has finished for PR 15868 at commit
|
Hi, @srowen and @gatorsmile . |
@@ -1087,6 +1087,13 @@ the following case-sensitive options: | |||
</tr> | |||
|
|||
<tr> | |||
<td><code>maxConnection</code></td> | |||
<td> | |||
The maximum number of JDBC connections, which determines how many connections to occur. This can reduce the change of bottleneck on JDBC servers. This option applies only to writing. It defaults to the number of partitions of RDD. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
The number of JDBC connections, which specifies the maximum number of simultaneous JDBC connections that are allowed.
This can reduce the change of bottleneck on JDBC servers.
What does this mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. I tried to mean the chance of bottlenecks
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix like that.
Test build #68672 has finished for PR 15868 at commit
|
@@ -358,7 +358,8 @@ case class DataSource( | |||
|
|||
providingClass.newInstance() match { | |||
case dataSource: CreatableRelationProvider => | |||
dataSource.createRelation(sparkSession.sqlContext, mode, options, data) | |||
dataSource.createRelation( | |||
sparkSession.sqlContext, mode, new CaseInsensitiveMap(options), data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep this unchanged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. It's included in #15884 . After that, I can remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this.
Could we have a test case for this new parameter? |
df.foreachPartition(iterator => savePartition( | ||
val maxConnections = options.maxConnections | ||
val repartitionedDF = | ||
if (maxConnections != null && maxConnections.toInt < df.rdd.getNumPartitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the input is less than 1, we should detect it at the beginning, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I'll add require
like batchsize
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's done.
@@ -1087,6 +1087,13 @@ the following case-sensitive options: | |||
</tr> | |||
|
|||
<tr> | |||
<td><code>maxConnections</code></td> | |||
<td> | |||
The number of JDBC connections, which specifies the maximum number of simultaneous JDBC connections that are allowed. This option applies only to writing. It defaults to the number of partitions of RDD. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the minimal value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the end of the description, I can add it. But, this will look different with other options in terms of level of depth. Is it okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Let us keep it unchanged.
Test build #68696 has finished for PR 15868 at commit
|
It seems not a test failure.
|
Retest this please. |
Test build #68929 has finished for PR 15868 at commit
|
Hi, @cloud-fan . |
LGTM, waiting @srowen for final sign off |
Merged to master |
Thank you for review and merging, @srowen , @gatorsmile , @cloud-fan , @lichenglin ! |
Hm - wouldn't a more general solution be to introduce a parallelism hint or command (basically the coalesce equivalent) in SQL? |
Hi, @rxin . |
Yes - otherwise it can get pretty confusing. I'd argue the user has already explicitly defined the degree of parallelism in the JDBC source using other options. Having this maxConnections option doing "coalesce" is pretty weird. |
Could you give the example of existing method you mention?
|
Don't they already specify "numPartitions" ? |
It's read-only option. At the first commit, I reuse that option for writing. But, as you see, it changed according the review advices. |
Let me find the correct code. (It's weird I attached a wrong code example.) |
Got it - so this is to enforce the write side: in the future please make it clear this is for the write side. Coming back to the discussion: shouldn't we just use numPartitions, given it is already there? Why pick another one to confuse users? |
The code seems to be gone during resolving conflicts. This is the my comment at the first review.
|
Basically right now if I were to define a table, I'd need to define both the read and the write side, using numPartitions and maxPartitions. Even if we want to have two separate configs for read vs write, it is absolutely not clear which one is which. If I were to just hand you two config options: numPartitions and maxPartitions. Would you be able to tell me why one is for read and the other is for write? |
The rational is to give a name which has a clear meaning. Also, I agreed with the following advice.
If you want to merge them together into |
My fault. In my initial thoughts, I do not have a strong opinon on either way. |
Then, @rxin and @gatorsmile . |
…ections` ## What changes were proposed in this pull request? This is a follow-up PR of apache#15868 to merge `maxConnections` option into `numPartitions` options. ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#15966 from dongjoon-hyun/SPARK-18413-2.
…ections` ## What changes were proposed in this pull request? This is a follow-up PR of apache#15868 to merge `maxConnections` option into `numPartitions` options. ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#15966 from dongjoon-hyun/SPARK-18413-2.
## What changes were proposed in this pull request? This PR adds a new JDBCOption `maxConnections` which means the maximum number of simultaneous JDBC connections allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. Previously, SQL users cannot cannot control this while Scala/Java/Python users can use `coalesce` (or `repartition`) API. **Reported Scenario** For the following cases, the number of connections becomes 200 and database cannot handle all of them. ```sql CREATE OR REPLACE TEMPORARY VIEW resultview USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:oracle:thin:10.129.10.111:1521:BKDB", dbtable "result", user "HIVE", password "HIVE" ); -- set spark.sql.shuffle.partitions=200 INSERT OVERWRITE TABLE resultview SELECT g, count(1) AS COUNT FROM tnet.DT_LIVE_INFO GROUP BY g ``` ## How was this patch tested? Manual. Do the followings and see Spark UI. **Step 1 (MySQL)** ``` CREATE TABLE t1 (a INT); CREATE TABLE data (a INT); INSERT INTO data VALUES (1); INSERT INTO data VALUES (2); INSERT INTO data VALUES (3); ``` **Step 2 (Spark)** ```scala SPARK_HOME=$PWD bin/spark-shell --driver-memory 4G --driver-class-path mysql-connector-java-5.1.40-bin.jar scala> sql("SET spark.sql.shuffle.partitions=3") scala> sql("CREATE OR REPLACE TEMPORARY VIEW data USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 'data', user 'root', password '')") scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '1')") scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a") scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2')") scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a") scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '3')") scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a") scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '4')") scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a") ```  Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#15868 from dongjoon-hyun/SPARK-18413.
…ections` ## What changes were proposed in this pull request? This is a follow-up PR of apache#15868 to merge `maxConnections` option into `numPartitions` options. ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#15966 from dongjoon-hyun/SPARK-18413-2.
What changes were proposed in this pull request?
This PR adds a new JDBCOption
maxConnections
which means the maximum number of simultaneous JDBC connections allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. Previously, SQL users cannot cannot control this while Scala/Java/Python users can usecoalesce
(orrepartition
) API.Reported Scenario
For the following cases, the number of connections becomes 200 and database cannot handle all of them.
How was this patch tested?
Manual. Do the followings and see Spark UI.
Step 1 (MySQL)
Step 2 (Spark)