Example of the Scala aggregate function
The aggregate function does not do that (except that it is a very general function, and it could be used to do that). You want groupBy
. Close to at least. As you start with a Seq[(String, String)]
, and you group by taking the first item in the tuple (which is (String, String) => String)
, it would return a Map[String, Seq[(String, String)]
). You then have to discard the first parameter in the Seq[String, String)] values.
So
list.groupBy(_._1).mapValues(_.map(_._2))
There you get a Map[String, Seq[(String, String)]
. If you want a Seq
instead of Map
, call toSeq
on the result. I don't think you have a guarantee on the order in the resulting Seq though
Aggregate is a more difficult function.
Consider first reduceLeft and reduceRight.
Let as
be a non empty sequence as = Seq(a1, ... an)
of elements of type A
, and f: (A,A) => A
be some way to combine two elements of type A
into one. I will note it as a binary operator @
, a1 @ a2
rather than f(a1, a2)
. as.reduceLeft(@)
will compute (((a1 @ a2) @ a3)... @ an)
. reduceRight
will put the parentheses the other way, (a1 @ (a2 @... @ an))))
. If @
happens to be associative, one does not care about the parentheses. One could compute it as (a1 @... @ ap) @ (ap+1 @...@an)
(there would be parantheses inside the 2 big parantheses too, but let's not care about that). Then one could do the two parts in parallel, while the nested bracketing in reduceLeft or reduceRight force a fully sequential computation. But parallel computation is only possible when @
is known to be associative, and the reduceLeft method cannot know that.
Still, there could be method reduce
, whose caller would be responsible for ensuring that the operation is associative. Then reduce
would order the calls as it sees fit, possibly doing them in parallel. Indeed, there is such a method.
There is a limitation with the various reduce methods however. The elements of the Seq can only be combined to a result of the same type: @
has to be (A,A) => A
. But one could have the more general problem of combining them into a B
. One starts with a value b
of type B
, and combine it with every elements of the sequence. The operator @
is (B,A) => B
, and one computes (((b @ a1) @ a2) ... @ an)
. foldLeft
does that. foldRight
does the same thing but starting with an
. There, the @
operation has no chance to be associative. When one writes b @ a1 @ a2
, it must mean (b @ a1) @ a2
, as (a1 @ a2)
would be ill-typed. So foldLeft and foldRight have to be sequential.
Suppose however, that each A
can be turned into a B
, let's write it with !
, a!
is of type B
. Suppose moreover that there is a +
operation (B,B) => B
, and that @
is such that b @ a
is in fact b + a!
. Rather than combining elements with @, one could first transform all of them to B with !
, then combine them with +
. That would be as.map(!).reduceLeft(+)
. And if +
is associative, then that can be done with reduce, and not be sequential: as.map(!).reduce(+). There could be an hypothetical method as.associativeFold(b, !, +).
Aggregate is very close to that. It may be however, that there is a more efficient way to implement b@a
than b+a!
For instance, if type B
is List[A]
, and b@a is a::b, then a!
will be a::Nil
, and b1 + b2
will be b2 ::: b1
. a::b is way better than (a::Nil):::b. To benefit from associativity, but still use @
, one first splits b + a1! + ... + an!
, into (b + a1! + ap!) + (ap+1! + ..+ an!)
, then go back to using @
with (b @ a1 @ an) + (ap+1! @ @ an)
. One still needs the ! on ap+1, because one must start with some b. And the + is still necessary too, appearing between the parantheses. To do that, as.associativeFold(!, +)
could be changed to as.optimizedAssociativeFold(b, !, @, +)
.
Back to +
. +
is associative, or equivalently, (B, +)
is a semigroup. In practice, most of the semigroups used in programming happen to be monoids too, i.e they contain a neutral element z
(for zero) in B, so that for each b
, z + b
= b + z
= b
. In that case, the !
operation that make sense is likely to be be a! = z @ a
. Moreover, as z is a neutral element b @ a1 ..@ an = (b + z) @ a1 @ an
which is b + (z + a1 @ an)
. So is is always possible to start the aggregation with z. If b
is wanted instead, you do b + result
at the end. With all those hypotheses, we can do as.aggregate(z, @, +)
. That is what aggregate
does. @
is the seqop
argument (applied in a sequence z @ a1 @ a2 @ ap
), and +
is combop
(applied to already partially combined results, as in (z + a1@...@ap) + (z + ap+1@...@an)
).
To sum it up, as.aggregate(z)(seqop, combop)
computes the same thing as as.foldLeft(z)( seqop)
provided that
(B, combop, z)
is a monoidseqop(b,a) = combop(b, seqop(z,a))
aggregate implementation may use the associativity of combop to group the computations as it likes (not swapping elements however, + has not to be commutative, ::: is not). It may run them in parallel.
Finally, solving the initial problem using aggregate
is left as an exercise to the reader. A hint: implement using foldLeft
, then find z
and combo
that will satisfy the conditions stated above.
Let's see if some ascii art doesn't help. Consider the type signature of aggregate
:
def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B
Also, note that A
refers to the type of the collection. So, let's say we have 4 elements in this collection, then aggregate
might work like this:
z A z A z A z A
\ / \ /seqop\ / \ /
B B B B
\ / combop \ /
B _ _ B
\ combop /
B
Let's see a practical example of that. Say I have a GenSeq("This", "is", "an", "example")
, and I want to know how many characters there are in it. I can write the following:
Note the use of par
in the below snippet of code. The second function passed to aggregate is what is called after the individual sequences are computed. Scala is only able to do this for sets that can be parallelized.
import scala.collection.GenSeq
val seq = GenSeq("This", "is", "an", "example")
val chars = seq.par.aggregate(0)(_ + _.length, _ + _)
So, first it would compute this:
0 + "This".length // 4
0 + "is".length // 2
0 + "an".length // 2
0 + "example".length // 7
What it does next cannot be predicted (there are more than one way of combining the results), but it might do this (like in the ascii art above):
4 + 2 // 6
2 + 7 // 9
At which point it concludes with
6 + 9 // 15
which gives the final result. Now, this is a bit similar in structure to foldLeft
, but it has an additional function (B, B) => B
, which fold doesn't have. This function, however, enables it to work in parallel!
Consider, for example, that each of the four computations initial computations are independent of each other, and can be done in parallel. The next two (resulting in 6 and 9) can be started once their computations on which they depend are finished, but these two can also run in parallel.
The 7 computations, parallelized as above, could take as little as the same time 3 serial computations.
Actually, with such a small collection the cost in synchronizing computation would be big enough to wipe out any gains. Furthermore, if you folded this, it would only take 4 computations total. Once your collections get larger, however, you start to see some real gains.
Consider, on the other hand, foldLeft
. Because it doesn't have the additional function, it cannot parallelize any computation:
(((0 + "This".length) + "is".length) + "an".length) + "example".length
Each of the inner parenthesis must be computed before the outer one can proceed.
The signature for a collection with elements of type A is:
def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B
z
is an object of type B acting as a neutral element. If you want to count something, you can use 0, if you want to build a list, start with an empty list, etc.segop
is analoguous to the function you pass tofold
methods. It takes two argument, the first one is the same type as the neutral element you passed and represent the stuff which was already aggregated on previous iteration, the second one is the next element of your collection. The result must also by of typeB
.combop
: is a function combining two results in one.
In most collections, aggregate is implemented in TraversableOnce
as:
def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B
= foldLeft(z)(seqop)
Thus combop
is ignored. However, it makes sense for parallel collections, becauseseqop
will first be applied locally in parallel, and then combop
is called to finish the aggregation.
So for your example, you can try with a fold first:
val seqOp =
(map:Map[String,Set[String]],tuple: (String,String)) =>
map + ( tuple._1 -> ( map.getOrElse( tuple._1, Set[String]() ) + tuple._2 ) )
list.foldLeft( Map[String,Set[String]]() )( seqOp )
// returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))
Then you have to find a way of collapsing two multimaps:
val combOp = (map1: Map[String,Set[String]], map2: Map[String,Set[String]]) =>
(map1.keySet ++ map2.keySet).foldLeft( Map[String,Set[String]]() ) {
(result,k) =>
result + ( k -> ( map1.getOrElse(k,Set[String]() ) ++ map2.getOrElse(k,Set[String]() ) ) )
}
Now, you can use aggregate in parallel:
list.par.aggregate( Map[String,Set[String]]() )( seqOp, combOp )
//Returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))
Applying the method "par" to list, thus using the parallel collection(scala.collection.parallel.immutable.ParSeq) of the list to really take advantage of the multi core processors. Without "par", there won't be any performance gain since the aggregate is not done on the parallel collection.