开发者

Concurrent Processing in Scala

I am trying use Concurrent programming in Scala. Based in this example here in StackOverflow, I made a program based on Problem 1 of Project Euler. I try three methods: The first one is a simple execution, with no paralelism. The second uses java.util.concurrency API through Executors and Callables. The third, based on page mentioned above, using scala.Futures. My objective is compare the execution times.

This is the code:

package sandbox

import java.util.concurrent._
import scala.actors._

object TestPool {

  def eval(n: Int): Boolean = (n % 3 == 0) || (n % 5 == 0)

  def runSingle(max: Int): Int = (1 until max).filter(eval(_)).foldLeft(0)(_ + _)

  def runPool(max: Int): Int = {

    def getCallable(i: Int): Callable[Boolean] = new Callable[Boolean] { def call = eval(i) }

    val pool = Executors.newFixedThreadPool(5)
    val result = (1 until max).filter(i => pool.submit(getCallable(i)).get).foldLeft(0)(_ + _)
    pool.shutdown
    pool.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)

    result
  }

  def runFutures(max: Int): Int = (1 until max).filter(i => Futures.future(eval(i)).apply).foldLeft(0)(_ + _)

  /**
   * f is the function to be runned. it returns a Tuple2 containing the sum and the 
   * execution time.
   */
  def test(max: Int, f: Int => Int): (Int, Long) = {
    val t0 = System.currentTi开发者_C百科meMillis
    val result = f(max)
    val deltaT = System.currentTimeMillis - t0

    (result, deltaT)
  }


  def main(args : Array[String]) : Unit = {
    val max = 10000

    println("Single : " + test(max, runSingle))
    println("Pool   : " + test(max, runPool))
    println("Futures: " + test(max, runFutures))
  }
}

These are the results:

max = 10:

  • Single : (23,31)
  • Pool : (23,16)
  • Futures: (23,31)

max = 100:

  • Single : (2318,33)
  • Pool : (2318,31)
  • Futures: (2318,55)

max = 1000:

  • Single : (233168,42)
  • Pool : (233168,111)
  • Futures: (233168,364)

max = 10000:

  • Single : (23331668,144)
  • Pool : (23331668,544)
  • Futures: ... I cancelled execution after 3 minutes

Obviously I could not use concurrency API from Java and Scala correctly. So I ask: Where is my mistake? What is the more appropriate form of using Concurrency? And about Scala Actors? Is it possible use them?


What result are you expecting? Are you expecting one of these methods to perform better than the others? Are you expecting the program to scale differently for different execution methods?

How many cores does your machine have? If you only have one core then you should expect the time to increase linearly with the work there is to do. What does your cpu usage look like over the course of the runs? Are the numbers repeatable?

You also have not taken into account the impact of JVM Hotspot warm-up times which can cause substantial problems for micro-benchmarks like this.


I assume you are using Scala 2.7. Basically, filter and map on Range (the result of 1 until max) is non-strict, meaning it will be computed on-demand, and it will be computed every time you try to access it's results.

Try this, for example:

val y = (1 to 10).filter{x => println("Filtering "+x); x % 2 == 0}.map{x => println("Mapping "+x); x * 2}
println(y(0))
println(y(1))
println(y(2))
println(y(0))

The result, anyway, is that your parallel stuff is running serially. Add a .force to the range, and it will be fine.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜