开发者

How do I rewrite a for loop with a shared dependency using actors

We have some code which needs to run faster. Its already profiled so we would like to make use of multiple threads. Usually I would setup an in memory queue, and have a number of threads taking jobs of the queue and calculating the results. For the shared data I would use a ConcurrentHashMap or similar.

I don't really want to go down that route again. From what I have read using actors will result in cleaner code and if I use akka migrating to more than 1 jvm should be easier. Is that true?

However, I don't know how to think in actors so I am not sure where to start.

To give a better idea of the problem here is some sample code:

case class Trade(price:Double, volume开发者_如何学Python:Int, stock:String) {
  def value(priceCalculator:PriceCalculator) =
    (priceCalculator.priceFor(stock)-> price)*volume
}
class PriceCalculator {
  def priceFor(stock:String) = {
    Thread.sleep(20)//a slow operation which can be cached
    50.0
  }
}
object ValueTrades {

  def valueAll(trades:List[Trade],
      priceCalculator:PriceCalculator):List[(Trade,Double)] = {
    trades.map { trade => (trade,trade.value(priceCalculator)) }
  }

  def main(args:Array[String]) {
    val trades = List(
      Trade(30.5, 10, "Foo"),
      Trade(30.5, 20, "Foo")
      //usually much longer
    )
    val priceCalculator = new PriceCalculator
    val values = valueAll(trades, priceCalculator)
  }

}

I'd appreciate it if someone with experience using actors could suggest how this would map on to actors.


This is a complement to my comment on shared results for expensive calculations. Here it is:

import scala.actors._
import Actor._
import Futures._

case class PriceFor(stock: String) // Ask for result

// The following could be an "object" as well, if it's supposed to be singleton
class PriceCalculator extends Actor {
  val map = new scala.collection.mutable.HashMap[String, Future[Double]]()
  def act = loop {
    react {
      case PriceFor(stock) => reply(map getOrElseUpdate (stock, future {
        Thread.sleep(2000) // a slow operation
        50.0
      }))
    }
  }
}

Here's an usage example:

scala> val pc = new PriceCalculator; pc.start
pc: PriceCalculator = PriceCalculator@141fe06

scala> class Test(stock: String) extends Actor {
     |   def act = {
     |     println(System.currentTimeMillis().toString+": Asking for stock "+stock)
     |     val f = (pc !? PriceFor(stock)).asInstanceOf[Future[Double]]
     |     println(System.currentTimeMillis().toString+": Got the future back")
     |     val res = f.apply() // this blocks until the result is ready
     |     println(System.currentTimeMillis().toString+": Value: "+res)
     |   }
     | }
defined class Test

scala> List("abc", "def", "abc").map(new Test(_)).map(_.start)
1269310737461: Asking for stock abc
res37: List[scala.actors.Actor] = List(Test@6d888e, Test@1203c7f, Test@163d118)
1269310737461: Asking for stock abc
1269310737461: Asking for stock def
1269310737464: Got the future back

scala> 1269310737462: Got the future back
1269310737465: Got the future back
1269310739462: Value: 50.0
1269310739462: Value: 50.0
1269310739465: Value: 50.0


scala> new Test("abc").start // Should return instantly
1269310755364: Asking for stock abc
res38: scala.actors.Actor = Test@15b5b68
1269310755365: Got the future back

scala> 1269310755367: Value: 50.0


For simple parallelization, where I throw a bunch of work out to process and then wait for it all to come back, I tend to like to use a Futures pattern.

class ActorExample {
  import actors._
  import Actor._
  class Worker(val id: Int) extends Actor {
    def busywork(i0: Int, i1: Int) = {
      var sum,i = i0
      while (i < i1) {
        i += 1
        sum += 42*i
      }
      sum
    }
    def act() { loop { react {
      case (i0:Int,i1:Int) => sender ! busywork(i0,i1)
      case None => exit()
    }}}
  }

  val workforce = (1 to 4).map(i => new Worker(i)).toList

  def parallelFourSums = {
    workforce.foreach(_.start())
    val futures = workforce.map(w => w !! ((w.id,1000000000)) );
    val computed = futures.map(f => f() match {
      case i:Int => i
      case _ => throw new IllegalArgumentException("I wanted an int!")
    })
    workforce.foreach(_ ! None)
    computed
  }

  def serialFourSums = {
    val solo = workforce.head
    workforce.map(w => solo.busywork(w.id,1000000000))
  }

  def timed(f: => List[Int]) = {
    val t0 = System.nanoTime
    val result = f
    val t1 = System.nanoTime
    (result, t1-t0)
  }

  def go {
    val serial = timed( serialFourSums )
    val parallel = timed( parallelFourSums )
    println("Serial result:  " + serial._1)
    println("Parallel result:" + parallel._1)
    printf("Serial took   %.3f seconds\n",serial._2*1e-9)
    printf("Parallel took %.3f seconds\n",parallel._2*1e-9)
  }
}

Basically, the idea is to create a collection of workers--one per workload--and then throw all the data at them with !! which immediately gives back a future. When you try to read the future, the sender blocks until the worker's actually done with the data.

You could rewrite the above so that PriceCalculator extended Actor instead, and valueAll coordinated the return of the data.

Note that you have to be careful passing non-immutable data around.

Anyway, on the machine I'm typing this from, if you run the above you get:

scala> (new ActorExample).go
Serial result:  List(-1629056553, -1629056636, -1629056761, -1629056928)
Parallel result:List(-1629056553, -1629056636, -1629056761, -1629056928)
Serial took   1.532 seconds
Parallel took 0.443 seconds

(Obviously I have at least four cores; the parallel timing varies rather a bit depending on which worker gets what processor and what else is going on on the machine.)

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜