scala: use parallel collections to do a foreach, and then do something else on each partition?
i have a Seq
of items and i need to do something on each item, and then perform a final step that doesn't need any input. i want to use par to speed this up: split up the Seq
into partitions, and within each partition do something on each item and perform the final step for each partition. i want the final step to be run in the thread processing a particular partition. is there a way to do that? aggregate()
doesn't seem to do the right thing.
here's some sample code:
// non-parallel case
val mySeq = Seq[Item]
mySe开发者_运维问答q foreach { actOnItem(_) }
doFinalStep()
// ideal par case
val mySeq = Seq[Item]
mySeq.par foreachThenDoFinalStepAfterPartition { actOnItem(_), doFinalStep }
I assume that the actOnItem
does some kind of a side-effect used by doFinalStep
. Since in parallel you have multiple side-effect groups, each of which you now want to process with doFinalStep
, you would have to track different side-effects for different groups in a, say, concurrent data structure. Without knowing exactly what the actOnItem
and doFinalStep
do, it's hard to convert this to a more functional style. You could do something like this:
class SF { /* whatever your sideeffect is */ }
val ac = new java.util.concurrent.atomic.AtomicInteger(0)
val sfmap = new java.util.concurrent.ConcurrentHashMap[Int, SF]()
def newSideeffectIndex() = {
val i = ac.fetchAndIncrement()
sfmap.put(i, new SF())
i
}
val mySeq = Seq[Item]()
mySeq.aggregate(-1)((u, x) => actOnItem(u, x), (u1, u2) => {
doFinalStep(u1)
doFinalStep(u2)
})
def actOnItem(u0: Int, x: Item) {
val u = if (u0 == -1) newSideeffectIndex() else u0
// do whatever you need to do with `x`
// ...
val sf = map.get(u)
// do something with `sf` - update it somehow based on `x`
u
}
def doFinalStep(u: Int) {
val sf = map.get(u)
if (sf != null) {
// do the final step here using `sf`
}
map.remove(u)
}
Explanation: each new partition starts with the aggregation value -1. The aggregating part (first closure) will initialize the aggregation value for the current partition if it detects -1. Initializing will pick a unique integer and create an SF object which holds the sideeffect. After that, it will process the current element and then update the sideeffect. At the reduction step you know that the partition has been dealt with, so you can do the final step for that partition - and remove it from the concurrent map.
精彩评论