How does the fold action work in Spark?
zeroValue
is added once for each partition and should a neutral element - in case of +
it should be 0. The exact result will depend on the number of partitions but it is equivalent to:
rdd1.mapPartitions(iter => Iterator(iter.foldLeft(zeroValue)(_ + _))).reduce(_ + _)
so:
val rdd1 = sc.parallelize(List(1,2,3,4,5),3)
distributes data as:
scala> rdd1.glom.collect
res1: Array[Array[Int]] = Array(Array(1), Array(2, 3), Array(4, 5))
and a whole expression is equivalent to:
(5 + 1) + (5 + 2 + 3) + (5 + 4 + 5)
plus 5 for jobResult
.
You know that Spark RDD's perform distributed computations.
So, this line here,
val rdd1 = sc.parallelize(List(1,2,3,4,5), 3)
tells Spark that it needs to support 3 partitions in this RDD and that will enable it to run computations using 3 independent executors in parallel.
Now, this line here,
rdd1.fold(5)(_ + _)
tells spark to fold all those partitions using 5 as initial value and then fold all these partition results from 3 executors again with 5 as initial value.
A normal Scala equivalent is can be written as,
val list = List(1, 2, 3, 4, 5)
val listOfList = list.grouped(2).toList
val listOfFolds = listOfList.map(l => l.fold(5)(_ + _))
val fold = listOfFolds.fold(5)(_ + _)
So... if you are using fold
on RDD's you need to provide a zero value
.
But then you will ask - why or when someone will use fold
instead of reduce
?
Your confusion lies in you perception of zero value
. The thing is that this zero value
for RDD[T] does not entirely depend on our type T
but also on the nature of computation. So your zero value
does not need to be 0
.
Lets consider a simple example where we want to calculate "largest number greater than 15" or "15"
in our RDD,
Can we do that using reduce
? The answer is NO. But we can do it using fold
.
val n15GT15 = rdd1.fold(15)({ case (acc, i) => Math.max(acc, i) })
Taken from the Scaladocs here (emphasis mine):
@param zeroValue the initial value for the accumulated result of each partition for the
op
operator, and also the initial value for the combine results from different partitions for theop
operator - this will typically be the neutral element (e.g.Nil
for list concatenation or0
for summation)
The zeroValue
is in your case added four times (one for each partition, plus one when combining the results from the partitions). So the result is:
(5 + 1) + (5 + 2 + 3) + (5 + 4 + 5) + 5 // (extra one for combining results)