Using akka futures and actors for parallelizing a list
I want to send a list of messages to an actor, receive a reply immediately in a future and then wait for all futures to complete before returning to the calling method. From reading the akka docs, I believe Future.sequence is the way to go but I have not been able to get the following code to work correctly. I get this error from the compiler:
found : akka.dispatch.ActorCompletableFuture
required: akka.dispatch.Future[Integer]
Error occurred in an application involving default arguments.
futures += secondary ? GetRandom
^
I'm sure I am missing something obvious but the code below seems to be "correct" per the examples and API docs.
import java.util.Random
import akka.dispatch.Future
import akka.actor._
import Commands._
import collection.mutable.ListBuffer
object Commands {
trait Command
case class GetRandom() extends Command
case class GenRandomList() extends Command
}
class Secondary() extends Actor {
val randomGenerator = new Random()
override def receive = {
case GetRandom() =>
self reply randomGenerator.nextInt(100)
}
}
class Primary() extends Actor {
private val secondary = Actor.actorOf[Secondary]
override def receive = {
case GenRandomList() 开发者_C百科=>
val futures = new ListBuffer[Future[Integer]]
for (i <- 0 until 10) {
futures += secondary ? GetRandom
}
val futureWithList = Future.sequence(futures)
futureWithList.map { foo =>
println("Shouldn't foo be an integer now? " + foo)
}.get
}
override def preStart() = {
secondary.start()
}
}
class Starter extends App {
println("Starting")
var master = Actor.actorOf(new Primary()).start()
master ! GenRandomList()
}
What is the correct way to send a series of messages to an actor, receive a future and return once all the futures have completed (optionally storing the results from each future in a List and returning it).
(secondary ? GetRandom).mapTo[Int]
Akka ?
returns a Future[Any]
but you need a Future[Int]
.
Thus you can either define a list which accept all kind of futures:
val futures = new ListBuffer[Future[Any]]
or cast the result as an Int
as soon as it is available:
for (i <- 0 until 10) {
futures += (secondary ? GetRandom) map {
_.asInstanceOf[Int]
}
}
BTW, to make it work, you need to change GetRandom
definition:
case object GetRandom extends Command
and match it with:
case GetRandom =>
精彩评论