-
Notifications
You must be signed in to change notification settings - Fork 9
Data Parallel Operations
This is also covered in the next course: https://github.com/rohitvg/scala-spark-4/wiki/Reduction-Operations
In Scala, most collection operations can become data-parallel. The .par
call converts a sequential collection to a parallel collection.
(1 until 1000).par
.filter(n => n % 3 == 0)
.count(n => n.toString == n.toString.reverse)
However, some operations are not parallelizable.
Task: implement the method sum using the foldLeft
method.
def sum(xs: Array[Int]): Int = {
xs.par.foldLeft(0)(_ + _)
}
Question: Does this implementation execute in parallel? Why not?
Answer: No. To see why not, we examine foldLeft
more closely:
def foldLeft[B](z: B)(f: (B, A) => B): B
The accumulator is passed sequentially to each element. i.e. previous elements need to be updated before updating next elements. Hence this cannot be data-parallelized.
Next, let’s examine the fold
method signature:
def fold(z: A)(f: (A, A) => A): A
Unlike foldLeft
, the fold
operaion can process the elements in a reduction tree and so so it can execute in parallel.
We can re-approach our task above to implement the sum
method on a List using fold
instead of foldLeft
:
def sum(xs: Array[Int]): Int = {
xs.par.fold(0)(_ + _)
}
Now we implement a max
method also using fold
:
def max(xs: Array[Int]): Int = {
xs.par.fold(Int.MinValue)(math.max) // use the min-element as the neutral element, and max func for folding.
// We could have used: (x,y) => if (x>y) x else y instead of math.max
}
Given a list of "rock", "paper" and "scissors" strings, find out who won:
Array("paper", "rock", "paper", "scissors").par.fold("")(play)
def play(a: String, b: String): String = List(a, b).sorted match {
case List("paper", "scissors") => "scissors" // scissors beats papers
case List("paper", "rock") => "paper" // paper beats rock
case List("rock", "scissors") => "rock" // rock beats scissors
case List(a, b) if a == b => a // if users choose the same options
case List("", b) => b // if one option is empty
}
// usage
play(play("paper", "rock"), play("paper", "scissors")) == "scissors"
play("paper", play("rock", play("paper", "scissors"))) == "paper" // same play but reorganized. Hence different answer.
Why does this happen? This is because the play
operator is commutative, but not associative.
In order for the fold operation to work correctly, the following relations must hold:
f(a, f(b, c)) == f(f(a, b), c)
f(z, a) == f(a, z) == a
We say that the neutral element z
and the binary operator f
must form a monoid.
Commutativity does not matter for fold – the following relation is not necessary:
f(a, b) == f(b, a)
Given an array of characters, use fold to return the vowel count:
Array('E', 'P', 'F', 'L').par.fold(0)((count, c) => if (isVowel(c)) count + 1 else count)
Question:
What does this snippet do?
- The program runs and returns the correct vowel count.
- The program is non-deterministic.
- The program returns incorrect vowel count.
- The program does not compile.
Answer:
The program does not compile. The signature of the fold operations says that the accumulator element must be the same type as the elements in the collection. Here elements are char
but accumulator is an int
. Also the fold operation can only produce values of the same type as the collection that it is called on, which is not the case here.
On the other hand, the foldLeft
is more expressive than fold
. Sanity check:
def fold(z: A)(op: (A, A) => A): A = foldLeft[A][z](op)
Extra reading: http://stackoverflow.com/questions/16111440/scala-fold-vs-foldleft
def fold[A1 >: A](z: A1)(op: (A1, A1) => A1): A1 def foldLeft[B](z: B)(op: (B, A) => B): BThis is the reason that
fold
can be implemented in parallel, whilefoldLeft
cannot. This is not only because of the *Left part which implies thatfoldLeft
goes from left to right sequentially, but also because the operatorop
cannot combine results computed in parallel -- it only defines how to combine the aggregation typeB
with the element typeA
, but not how to combine two aggregations of typeB
. The fold method, in turn, does define this, because the aggregation typeA1
has to be a supertype of the element typeA
, that isA1 >: A
. This supertype relationship allows in the same time folding over the aggregation and elements, and combining aggregations -- both with a single operator.
Let’s examine the aggregate signature:
def aggregate[B](z: B)(f: (B, A) => B, g: (B, B) => B): B
// B is the folding type
// z is the accumulator
// f is the sequential folding operator
// g is the parallel folding operator
Thus, as it uses a combination of sequential and parallel operators, It is a combination of foldLeft
and fold
.
Revisiting our problem of counting the number of vowels in a character array using the aggregate
method:
Array('E', 'P', 'F', 'L').par.aggregate(0)( (count, c) => if (isVowel(c)) count + 1 else count, _ + _ )
// the 0 (neutral element) and _ + _ (parallel reduction operator) form the monad.
So far, we saw the accessor combinators.
Transformer combinators, such as map
, filter
, flatMap
and groupBy
, do not return a single value, but instead return new collections as results.
Week 1
- Introduction to Parallel Computing
- Parallelism on the JVM
- Running Computations in Parallel
- Monte Carlo Method to Estimate Pi
- First Class Tasks
- How fast are parallel programs?
- Benchmarking Parallel Programs
Week 2
- Parallel Sorting
- Data Operations
- Parallel map()
- Parallel fold()
- Associativity I
- Associativity II
- Parallel Scan (Prefix Sum) Operation
Week 3
- Data Parallel Programming
- Data Parallel Operations
- Scala Parallel Collections
- Splitters and Combiners
Week 4