forked from mahmoudparsian/pyspark-tutorial
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
d6d1ccf
commit 2da8c10
Showing
1 changed file
with
148 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
Spark's mapPartitions() | ||
======================= | ||
|
||
According to Spark API: ````mapPartitions(func)```` transformation is | ||
similar to ````map()````, but runs separately on each partition (block) | ||
of the RDD, so ````func```` must be of type ````Iterator<T> => Iterator<U>```` | ||
when running on an RDD of type T. | ||
|
||
|
||
The ````mapPartitions()```` transformation should be used when you want to | ||
extract some condensed information (such as finding the minimum and maximum | ||
of numbers) from each partition. For example, if you want to find the minimum | ||
and maximum of all numbers in your input, then using ````map()```` can be | ||
pretty inefficient, since you will be generating tons of intermediate | ||
(K,V) pairs, but the bottom line is you just want to find two numbers: the | ||
minimum and maximum of all numbers in your input. Another example can be if | ||
you want to find top-10 (or bottom-10) for your input, then mapPartitions() | ||
can work very well: find the top-10 (or bottom-10) per partition, then find | ||
the top-10 (or bottom-10) for all partitions: this way you are limiting | ||
emitting too many intermediate (K,V) pairs. | ||
|
||
|
||
Example-1: Sum Each Partition | ||
============================= | ||
```` | ||
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] | ||
>>> numbers | ||
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10] | ||
>>> rdd = sc.parallelize(numbers, 3) | ||
>>> rdd.collect() | ||
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10] | ||
>>> rdd.getNumPartitions() | ||
3 | ||
>>> def f(iterator): | ||
... for x in iterator: | ||
... print(x) | ||
... print "===" | ||
... | ||
>>> rdd.foreachPartition(f) | ||
1 | ||
2 | ||
3 | ||
=== | ||
7 | ||
8 | ||
9 | ||
10 | ||
=== | ||
4 | ||
5 | ||
6 | ||
=== | ||
>>> def adder(iterator): | ||
... yield sum(iterator) | ||
... | ||
>>> rdd.mapPartitions(adder).collect() | ||
[6, 15, 34] | ||
```` | ||
|
||
|
||
Example-2: Find Minimum and Maximum | ||
=================================== | ||
Use ````mapPartitions()```` and find the minimum and maximum from each partition. | ||
|
||
To make it a cleaner solution, we define a python function to return the minimum and maximum | ||
for a given iteration. | ||
|
||
```` | ||
$ cat minmax.py | ||
#!/usr/bin/python | ||
def minmax(iterator): | ||
firsttime = 0 | ||
#min = 0; | ||
#max = 0; | ||
for x in iterator: | ||
if (firsttime == 0): | ||
min = x; | ||
max = x; | ||
firsttime = 1 | ||
else: | ||
if x > max: | ||
max = x | ||
if x < min: | ||
min = x | ||
# | ||
return (min, max) | ||
# | ||
#data = [10, 20, 3, 4, 5, 2, 2, 20, 20, 10] | ||
#print minmax(data) | ||
```` | ||
Then we use the minmax function for the ````mapPartitions()````: | ||
|
||
```` | ||
### NOTE: data can be huge, but for understanding | ||
### the mapPartitions() we use a very small data set | ||
>>> data = [10, 20, 3, 4, 5, 2, 2, 20, 20, 10] | ||
>>> rdd = sc.parallelize(data, 3) | ||
>>> rdd.getNumPartitions() | ||
3 | ||
>>> rdd.collect() | ||
[10, 20, 3, 4, 5, 2, 2, 20, 20, 10] | ||
>>> def f(iterator): | ||
... for x in iterator: | ||
... print(x) | ||
... print "===" | ||
... ^D | ||
>>> rdd.foreachPartition(f) | ||
10 | ||
20 | ||
3 | ||
=== | ||
4 | ||
5 | ||
2 | ||
=== | ||
2 | ||
20 | ||
20 | ||
10 | ||
=== | ||
>>> | ||
>>> minmax = "/Users/mparsian/spark-1.6.1-bin-hadoop2.6/minmax.py" | ||
>>> import minmax | ||
### NOTE: the minmaxlist is a small list of numbers | ||
### two mumbers (min and max) are generated per partition | ||
>>> minmaxlist = rdd.mapPartitions(minmax.minmax).collect() | ||
>>> minmaxlist | ||
[3, 20, 2, 5, 2, 20] | ||
>>> min(minmaxlist) | ||
2 | ||
>>> max(minmaxlist) | ||
20 | ||
```` |