Skip to content

[SPARK-5785] [PySpark] narrow dependency for cogroup/join in PySpark #4629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Feb 16, 2015

Currently, PySpark does not support narrow dependency during cogroup/join when the two RDDs have the partitioner, another unnecessary shuffle stage will come in.

The Python implementation of cogroup/join is different than Scala one, it depends on union() and partitionBy(). This patch will try to use PartitionerAwareUnionRDD() in union(), when all the RDDs have the same partitioner. It also fix reservePartitioner in all the map() or mapPartitions(), then partitionBy() can skip the unnecessary shuffle stage.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27573 has started for PR 4629 at commit eb26c62.

  • This patch merges cleanly.

} else {
new UnionRDD(this, rdds)
}
}

/** Build the union of a list of RDDs passed as variable-length arguments. */
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this method to call the union method that you modified so the change will take effect here, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27582 has started for PR 4629 at commit ff5a0a6.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27583 has started for PR 4629 at commit 940245e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27587 has started for PR 4629 at commit cc28d97.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27573 has finished for PR 4629 at commit eb26c62.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27573/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27582 has finished for PR 4629 at commit ff5a0a6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Partitioner(object):

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27582/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #610 has started for PR 4629 at commit cc28d97.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27583 has finished for PR 4629 at commit 940245e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Partitioner(object):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27583/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Feb 16, 2015

Test build #27587 has finished for PR 4629 at commit cc28d97.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Partitioner(object):
    • case class ParquetRelation2(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27587/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #610 has finished for PR 4629 at commit cc28d97.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -961,11 +961,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/** Build the union of a list of RDDs. */
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
val partitioners = rdds.map(_.partitioner).toSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If _.partitioner is an option, then I think this can be simplified by using flatMap instead of map, since that would just let you check whether partitioners.size == 1 on the next line without having to have the isDefined check as well.

@JoshRosen
Copy link
Contributor

LGTM overall; this is tricky logic, though, so I'll take one more pass through when I get home.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #27612 has started for PR 4629 at commit 4d29932.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #27612 has finished for PR 4629 at commit 4d29932.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Partitioner(object):

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27612/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #611 has started for PR 4629 at commit 4d29932.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #611 has finished for PR 4629 at commit 4d29932.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -740,6 +739,27 @@ def test_multiple_python_java_RDD_conversions(self):
converted_rdd = RDD(data_python_rdd, self.sc)
self.assertEqual(2, converted_rdd.count())

def test_narrow_dependency_in_join(self):
rdd = self.sc.parallelize(range(10)).map(lambda x: (x, x))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these tests actually check for a narrow dependency at all? I think they will pass even without it.

I'm not sure of a better suggestion, though. I had to use getNarrowDependencies in another PR to check this:
https://github.com/apache/spark/pull/4449/files#diff-4bc3643ce90b54113cad7104f91a075bR582

but I don't think that is even exposed in pyspark ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is only for correctness, I will add more check for narrow dependency base one the Python progress API (#3027)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've merged #3027, so I think we can now test this by setting a job group, running a job, then querying the statusTracker to determine how many stages were actually run as part of that job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #27657 has started for PR 4629 at commit dffe34e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #27657 has finished for PR 4629 at commit dffe34e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Partitioner(object):
    • class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status")):
    • class SparkStageInfo(namedtuple("SparkStageInfo",
    • class StatusTracker(object):

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27657/
Test PASSed.

@JoshRosen
Copy link
Contributor

Thanks for adding the test.

LGTM, so I'm going to merge this into master (1.4.0) and branch-1.3 (1.3.0). Thanks!

asfgit pushed a commit that referenced this pull request Feb 18, 2015
Currently, PySpark does not support narrow dependency during cogroup/join when the two RDDs have the partitioner, another unnecessary shuffle stage will come in.

The Python implementation of cogroup/join is different than Scala one, it depends on union() and partitionBy(). This patch will try to use PartitionerAwareUnionRDD() in union(), when all the RDDs have the same partitioner. It also fix `reservePartitioner` in all the map() or mapPartitions(), then partitionBy() can skip the unnecessary shuffle stage.

Author: Davies Liu <davies@databricks.com>

Closes #4629 from davies/narrow and squashes the following commits:

dffe34e [Davies Liu] improve test, check number of stages for join/cogroup
1ed3ba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into narrow
4d29932 [Davies Liu] address comment
cc28d97 [Davies Liu] add unit tests
940245e [Davies Liu] address comments
ff5a0a6 [Davies Liu] skip the partitionBy() on Python side
eb26c62 [Davies Liu] narrow dependency in PySpark

(cherry picked from commit c3d2b90)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
@asfgit asfgit closed this in c3d2b90 Feb 18, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants