-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-11517][SQL]Calc partitions in parallel for multiple partitions table #9483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #45083 has finished for PR 9483 at commit
|
cc/ @scwf @Sephiroth-Lin, not sure if you guys get time for benchmarking this with the real world cases. |
Hi @zhichao-li ,thanks for doing this.I got a problem of scanning partitions slowly,and I apply this patch to my spark version.In my case:
I am happy to see it takes effect in my case.It solve my problem.And I think is it better to add conf to control whether to use this feature? |
rdds: Seq[RDD[T]]) extends UnionRDD[T](sc, rdds){ | ||
// TODO: We might need to guess a more reasonable thread pool size here | ||
@transient val executorService = ThreadUtils.newDaemonFixedThreadPool( | ||
Math.min(rdds.size, Runtime.getRuntime.availableProcessors()), "ParallelUnionRDD") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we share the single thread pool instead of creating a thread pool for every ParallelUnionRDD
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have strong opinion on this. How about creating a shared thread pool with the same size as cpu cores ?
object ParallelUnionRDD{
val executorService = ThreadUtils.newDaemonFixedThreadPool(Runtime.getRuntime.availableProcessors(), "ParallelUnionRDD")
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we have to put the fixed number of Runtime.getRuntime.availableProcessors()
, probably we can simply put a fixed number says 16
or even bigger, as the bottleneck is in network / IO, not the CPU scheduling.
})) | ||
}.map {case(r, f) => (r, f.get())} | ||
|
||
val array = new Array[Partition](rddPartitions.map(_._2.length).sum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems here still be the main thread, probably we even don't need to place the synchronized
in the getPartitions
.
63dc9c0
to
6456f12
Compare
Test build #51856 has finished for PR 9483 at commit
|
Test build #51861 has finished for PR 9483 at commit
|
retest this please |
Test build #51920 has finished for PR 9483 at commit
|
import org.apache.spark.rdd.{RDD, UnionPartition, UnionRDD} | ||
import org.apache.spark.util.ThreadUtils | ||
|
||
object ParallelUnionRDD { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[hive]
or move it into the upper level package? The same for the class ParallelUnionRDD
.
LGTM except some minor suggestions. |
Test build #52152 has finished for PR 9483 at commit
|
retest this please |
Test build #52156 has finished for PR 9483 at commit
|
retest this please |
Test build #52210 has finished for PR 9483 at commit
|
Thanks for the pull request. I'm going through a list of pull requests to cut them down since the sheer number is breaking some of the tooling we have. Due to lack of activity on this pull request, I'm going to push a commit to close it. Feel free to reopen it or create a new one. We can also continue the discussion on the JIRA ticket. |
Currently we calculate the getPartitions for each "hive partition" in sequence way, it would be faster if we can parallel this on driver side