Skip to content

[SPARK-11517][SQL]Calc partitions in parallel for multiple partitions table #9483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

zhichao-li
Copy link
Contributor

Currently we calculate the getPartitions for each "hive partition" in sequence way, it would be faster if we can parallel this on driver side

@zhichao-li
Copy link
Contributor Author

cc @chenghao-intel

@SparkQA
Copy link

SparkQA commented Nov 5, 2015

Test build #45083 has finished for PR 9483 at commit 63dc9c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class ParallelUnionRDD[T: ClassTag](\n

@chenghao-intel
Copy link
Contributor

cc/ @scwf @Sephiroth-Lin, not sure if you guys get time for benchmarking this with the real world cases.

@zhonghaihua
Copy link
Contributor

Hi @zhichao-li ,thanks for doing this.I got a problem of scanning partitions slowly,and I apply this patch to my spark version.In my case:

  • Before I apply this patch,it takes at least 3 or 4 minutes to scan partitions.
  • After applying this patch,it takes only about 20 seconds at this stage.

I am happy to see it takes effect in my case.It solve my problem.And I think is it better to add conf to control whether to use this feature?

rdds: Seq[RDD[T]]) extends UnionRDD[T](sc, rdds){
// TODO: We might need to guess a more reasonable thread pool size here
@transient val executorService = ThreadUtils.newDaemonFixedThreadPool(
Math.min(rdds.size, Runtime.getRuntime.availableProcessors()), "ParallelUnionRDD")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we share the single thread pool instead of creating a thread pool for every ParallelUnionRDD?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong opinion on this. How about creating a shared thread pool with the same size as cpu cores ?

object ParallelUnionRDD{
val executorService = ThreadUtils.newDaemonFixedThreadPool(Runtime.getRuntime.availableProcessors(), "ParallelUnionRDD")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have to put the fixed number of Runtime.getRuntime.availableProcessors(), probably we can simply put a fixed number says 16 or even bigger, as the bottleneck is in network / IO, not the CPU scheduling.

@zhichao-li
Copy link
Contributor Author

@yhuai @rxin , any thoughts or concerns for this PR? It's common that one table contains tons of partitions(i.e every 15mins a partition for clicking data).

}))
}.map {case(r, f) => (r, f.get())}

val array = new Array[Partition](rddPartitions.map(_._2.length).sum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems here still be the main thread, probably we even don't need to place the synchronized in the getPartitions.

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51856 has finished for PR 9483 at commit 6456f12.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51861 has finished for PR 9483 at commit db84ab9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhichao-li
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 25, 2016

Test build #51920 has finished for PR 9483 at commit db84ab9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

import org.apache.spark.rdd.{RDD, UnionPartition, UnionRDD}
import org.apache.spark.util.ThreadUtils

object ParallelUnionRDD {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[hive] or move it into the upper level package? The same for the class ParallelUnionRDD.

@chenghao-intel
Copy link
Contributor

LGTM except some minor suggestions.

@SparkQA
Copy link

SparkQA commented Feb 29, 2016

Test build #52152 has finished for PR 9483 at commit fdac95b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhichao-li
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 29, 2016

Test build #52156 has finished for PR 9483 at commit fdac95b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhichao-li
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 1, 2016

Test build #52210 has finished for PR 9483 at commit fdac95b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jun 15, 2016

Thanks for the pull request. I'm going through a list of pull requests to cut them down since the sheer number is breaking some of the tooling we have. Due to lack of activity on this pull request, I'm going to push a commit to close it. Feel free to reopen it or create a new one. We can also continue the discussion on the JIRA ticket.

@asfgit asfgit closed this in 1a33f2e Jun 15, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants