Example of the Scala aggregate function
I have been looking and I cannot find an example or discussion of the aggregate
function in Scala that I can understand. It seems pretty powerful.
Can this function be used to reduce the values of tuples to make a multimap-type collection? For example:
val list = Seq(("one", "i"), ("two", "2"), ("two", "ii"), ("one", 开发者_开发技巧"1"), ("four", "iv"))
After applying aggregate:
Seq(("one" -> Seq("i","1")), ("two" -> Seq("2", "ii")), ("four" -> Seq("iv"))
Also, can you give example of parameters z
, segop
, and combop
? I'm unclear on what these parameters do.
Let's see if some ascii art doesn't help. Consider the type signature of aggregate
:
def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B
Also, note that A
refers to the type of the collection. So, let's say we have 4 elements in this collection, then aggregate
might work like this:
z A z A z A z A
\ / \ /seqop\ / \ /
B B B B
\ / combop \ /
B _ _ B
\ combop /
B
Let's see a practical example of that. Say I have a GenSeq("This", "is", "an", "example")
, and I want to know how many characters there are in it. I can write the following:
Note the use of par
in the below snippet of code. The second function passed to aggregate is what is called after the individual sequences are computed. Scala is only able to do this for sets that can be parallelized.
import scala.collection.GenSeq
val seq = GenSeq("This", "is", "an", "example")
val chars = seq.par.aggregate(0)(_ + _.length, _ + _)
So, first it would compute this:
0 + "This".length // 4
0 + "is".length // 2
0 + "an".length // 2
0 + "example".length // 7
What it does next cannot be predicted (there are more than one way of combining the results), but it might do this (like in the ascii art above):
4 + 2 // 6
2 + 7 // 9
At which point it concludes with
6 + 9 // 15
which gives the final result. Now, this is a bit similar in structure to foldLeft
, but it has an additional function (B, B) => B
, which fold doesn't have. This function, however, enables it to work in parallel!
Consider, for example, that each of the four computations initial computations are independent of each other, and can be done in parallel. The next two (resulting in 6 and 9) can be started once their computations on which they depend are finished, but these two can also run in parallel.
The 7 computations, parallelized as above, could take as little as the same time 3 serial computations.
Actually, with such a small collection the cost in synchronizing computation would be big enough to wipe out any gains. Furthermore, if you folded this, it would only take 4 computations total. Once your collections get larger, however, you start to see some real gains.
Consider, on the other hand, foldLeft
. Because it doesn't have the additional function, it cannot parallelize any computation:
(((0 + "This".length) + "is".length) + "an".length) + "example".length
Each of the inner parenthesis must be computed before the outer one can proceed.
The aggregate function does not do that (except that it is a very general function, and it could be used to do that). You want groupBy
. Close to at least. As you start with a Seq[(String, String)]
, and you group by taking the first item in the tuple (which is (String, String) => String)
, it would return a Map[String, Seq[(String, String)]
). You then have to discard the first parameter in the Seq[String, String)] values.
So
list.groupBy(_._1).mapValues(_.map(_._2))
There you get a Map[String, Seq[(String, String)]
. If you want a Seq
instead of Map
, call toSeq
on the result. I don't think you have a guarantee on the order in the resulting Seq though
Aggregate is a more difficult function.
Consider first reduceLeft and reduceRight.
Let as
be a non empty sequence as = Seq(a1, ... an)
of elements of type A
, and f: (A,A) => A
be some way to combine two elements of type A
into one. I will note it as a binary operator @
, a1 @ a2
rather than f(a1, a2)
. as.reduceLeft(@)
will compute (((a1 @ a2) @ a3)... @ an)
. reduceRight
will put the parentheses the other way, (a1 @ (a2 @... @ an))))
. If @
happens to be associative, one does not care about the parentheses. One could compute it as (a1 @... @ ap) @ (ap+1 @...@an)
(there would be parantheses inside the 2 big parantheses too, but let's not care about that). Then one could do the two parts in parallel, while the nested bracketing in reduceLeft or reduceRight force a fully sequential computation. But parallel computation is only possible when @
is known to be associative, and the reduceLeft method cannot know that.
Still, there could be method reduce
, whose caller would be responsible for ensuring that the operation is associative. Then reduce
would order the calls as it sees fit, possibly doing them in parallel. Indeed, there is such a method.
There is a limitation with the various reduce methods however. The elements of the Seq can only be combined to a result of the same type: @
has to be (A,A) => A
. But one could have the more general problem of combining them into a B
. One starts with a value b
of type B
, and combine it with every elements of the sequence. The operator @
is (B,A) => B
, and one computes (((b @ a1) @ a2) ... @ an)
. foldLeft
does that. foldRight
does the same thing but starting with an
. There, the @
operation has no chance to be associative. When one writes b @ a1 @ a2
, it must mean (b @ a1) @ a2
, as (a1 @ a2)
would be ill-typed. So foldLeft and foldRight have to be sequential.
Suppose however, that each A
can be turned into a B
, let's write it with !
, a!
is of type B
. Suppose moreover that there is a +
operation (B,B) => B
, and that @
is such that b @ a
is in fact b + a!
. Rather than combining elements with @, one could first transform all of them to B with !
, then combine them with +
. That would be as.map(!).reduceLeft(+)
. And if +
is associative, then that can be done with reduce, and not be sequential: as.map(!).reduce(+). There could be an hypothetical method as.associativeFold(b, !, +).
Aggregate is very close to that. It may be however, that there is a more efficient way to implement b@a
than b+a!
For instance, if type B
is List[A]
, and b@a is a::b, then a!
will be a::Nil
, and b1 + b2
will be b2 ::: b1
. a::b is way better than (a::Nil):::b. To benefit from associativity, but still use @
, one first splits b + a1! + ... + an!
, into (b + a1! + ap!) + (ap+1! + ..+ an!)
, then go back to using @
with (b @ a1 @ an) + (ap+1! @ @ an)
. One still needs the ! on ap+1, because one must start with some b. And the + is still necessary too, appearing between the parantheses. To do that, as.associativeFold(!, +)
could be changed to as.optimizedAssociativeFold(b, !, @, +)
.
Back to +
. +
is associative, or equivalently, (B, +)
is a semigroup. In practice, most of the semigroups used in programming happen to be monoids too, i.e they contain a neutral element z
(for zero) in B, so that for each b
, z + b
= b + z
= b
. In that case, the !
operation that make sense is likely to be be a! = z @ a
. Moreover, as z is a neutral element b @ a1 ..@ an = (b + z) @ a1 @ an
which is b + (z + a1 @ an)
. So is is always possible to start the aggregation with z. If b
is wanted instead, you do b + result
at the end. With all those hypotheses, we can do as.aggregate(z, @, +)
. That is what aggregate
does. @
is the seqop
argument (applied in a sequence z @ a1 @ a2 @ ap
), and +
is combop
(applied to already partially combined results, as in (z + a1@...@ap) + (z + ap+1@...@an)
).
To sum it up, as.aggregate(z)(seqop, combop)
computes the same thing as as.foldLeft(z)( seqop)
provided that
(B, combop, z)
is a monoidseqop(b,a) = combop(b, seqop(z,a))
aggregate implementation may use the associativity of combop to group the computations as it likes (not swapping elements however, + has not to be commutative, ::: is not). It may run them in parallel.
Finally, solving the initial problem using aggregate
is left as an exercise to the reader. A hint: implement using foldLeft
, then find z
and combo
that will satisfy the conditions stated above.
The signature for a collection with elements of type A is:
def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B
z
is an object of type B acting as a neutral element. If you want to count something, you can use 0, if you want to build a list, start with an empty list, etc.segop
is analoguous to the function you pass tofold
methods. It takes two argument, the first one is the same type as the neutral element you passed and represent the stuff which was already aggregated on previous iteration, the second one is the next element of your collection. The result must also by of typeB
.combop
: is a function combining two results in one.
In most collections, aggregate is implemented in TraversableOnce
as:
def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B
= foldLeft(z)(seqop)
Thus combop
is ignored. However, it makes sense for parallel collections, becauseseqop
will first be applied locally in parallel, and then combop
is called to finish the aggregation.
So for your example, you can try with a fold first:
val seqOp =
(map:Map[String,Set[String]],tuple: (String,String)) =>
map + ( tuple._1 -> ( map.getOrElse( tuple._1, Set[String]() ) + tuple._2 ) )
list.foldLeft( Map[String,Set[String]]() )( seqOp )
// returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))
Then you have to find a way of collapsing two multimaps:
val combOp = (map1: Map[String,Set[String]], map2: Map[String,Set[String]]) =>
(map1.keySet ++ map2.keySet).foldLeft( Map[String,Set[String]]() ) {
(result,k) =>
result + ( k -> ( map1.getOrElse(k,Set[String]() ) ++ map2.getOrElse(k,Set[String]() ) ) )
}
Now, you can use aggregate in parallel:
list.par.aggregate( Map[String,Set[String]]() )( seqOp, combOp )
//Returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))
Applying the method "par" to list, thus using the parallel collection(scala.collection.parallel.immutable.ParSeq) of the list to really take advantage of the multi core processors. Without "par", there won't be any performance gain since the aggregate is not done on the parallel collection.
aggregate
is like foldLeft
but may executed in parallel.
As missingfactor says, the linear version of aggregate(z)(seqop, combop)
is equivalent to foldleft(z)(seqop)
. This is however impractical in the parallel case, where we would need to combine not only the next element with the previous result (as in a normal fold) but we want to split the iterable into sub-iterables on which we call aggregate and need to combine those again. (In left-to-right order but not associative as we might have combined the last parts before the fist parts of the iterable.) This re-combining in in general non-trivial, and therefore, one needs a method (S, S) => S
to accomplish that.
The definition in ParIterableLike
is:
def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))
}
which indeed uses combop
.
For reference, Aggregate
is defined as:
protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, protected[this] val pit: IterableSplitter[T])
extends Accessor[S, Aggregate[S]] {
@volatile var result: S = null.asInstanceOf[S]
def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop)
protected[this] def newSubtask(p: IterableSplitter[T]) = new Aggregate(z, seqop, combop, p)
override def merge(that: Aggregate[S]) = result = combop(result, that.result)
}
The important part is merge
where combop
is applied with two sub-results.
Here is the blog on how aggregate enable performance on the multi cores processor with bench mark. http://markusjais.com/scalas-parallel-collections-and-the-aggregate-method/
Here is video on "Scala parallel collections" talk from "Scala Days 2011". http://days2011.scala-lang.org/node/138/272
The description on the video
Scala Parallel Collections
Aleksandar Prokopec
Parallel programming abstractions become increasingly important as the number of processor cores grows. A high-level programming model enables the programmer to focus more on the program and less on low-level details such as synchronization and load-balancing. Scala parallel collections extend the programming model of the Scala collection framework, providing parallel operations on datasets. The talk will describe the architecture of the parallel collection framework, explaining their implementation and design decisions. Concrete collection implementations such as parallel hash maps and parallel hash tries will be described. Finally, several example applications will be shown, demonstrating the programming model in practice.
The definition of aggregate
in TraversableOnce
source is:
def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B =
foldLeft(z)(seqop)
which is no different than a simple foldLeft
. combop
doesn't seem to be used anywhere. I am myself confused as to what the purpose of this method is.
Just to clarify explanations of those before me, in theory the idea is that aggregate should work like this, (I have changed the names of the parameters to make them clearer):
Seq(1,2,3,4).aggragate(0)(
addToPrev = (prev,curr) => prev + curr,
combineSums = (sumA,sumB) => sumA + sumB)
Should logically translate to
Seq(1,2,3,4)
.grouped(2) // split into groups of 2 members each
.map(prevAndCurrList => prevAndCurrList(0) + prevAndCurrList(1))
.foldLeft(0)(sumA,sumB => sumA + sumB)
Because the aggregation and mapping are separate, the original list could theoretically be split into different groups of different sizes and run in parallel or even on different machine. In practice scala current implementation does not support this feature by default but you can do this in your own code.
精彩评论