Converting Scala @suspendable Method into a Future
suppose I have a sleep function:
def sleep(delay:Int) : Unit @suspendable = {
....
}
is it possible to have a function future that creates an async version of the sleep function that can be awaited on synchronously.
def future(targetFunc: (Int => Unit @suspendable)) : (Int => Future) = {
....
}
class Future {
def await : Unit @suspendable = {
....
}
}
you should be able to do something like this:
reset {
val sleepAsync = future(sleep)
val future1 = sleepAsync(2000)
val future2 = sleepAsync(3000)
future1.await
future2.await
/* finishes after a delay of 3000 */
}
the two calls to sleepAsync should appear to return straight away and the two calls to Future#await should appear to block. of course they all really fall off the end of reset and the code after is responsible for calling the continuation after the delay.
otherwise is there an alternative method to running two @suspendable functions in parallel and wait for both of them to complete?
I have a compi开发者_C百科lable gist with a skeleton of what i want to do: https://gist.github.com/1191381
object Forks {
import scala.util.continuations._
case class Forker(forks: Vector[() => Unit @suspendable]) {
def ~(block: => Unit @suspendable): Forker = Forker(forks :+ (() => block))
def joinIf(pred: Int => Boolean): Unit @suspendable = shift { k: (Unit => Unit) =>
val counter = new java.util.concurrent.atomic.AtomicInteger(forks.size)
forks foreach { f =>
reset {
f()
if (pred(counter.decrementAndGet)) k()
}
}
}
def joinAll() = joinIf(_ == 0)
def joinAny() = joinIf(_ == forks.size - 1)
}
def fork(block: => Unit @suspendable): Forker = Forker(Vector(() => block))
}
using fork(), we can now wait many "suspendables". use ~() to chain together suspendables. use joinAll() to wait for all suspendables and joinAny() to wait for just one. use joinIf() to customize join strategy.
object Tests extends App {
import java.util.{Timer, TimerTask}
import scala.util.continuations._
implicit val timer = new Timer
def sleep(ms: Int)(implicit timer: Timer): Unit @suspendable = {
shift { k: (Unit => Unit) =>
timer.schedule(new TimerTask {
def run = k()
}, ms)
}
}
import Forks._
reset {
fork {
println("sleeping for 2000 ms")
sleep(2000)
println("slept for 2000 ms")
} ~ {
println("sleeping for 4000 ms")
sleep(4000)
println("slept for 4000 ms")
} joinAll()
println("and we are done")
}
println("outside reset")
readLine
timer.cancel
}
and this is the output. program starts at time T:
sleeping for 2000 ms
sleeping for 4000 ms
outside reset <<<<<< T + 0 second
slept for 2000 ms <<<<<< T + 2 seconds
slept for 4000 ms <<<<<< T + 4 seconds
and we are done <<<<<< T + 4 seconds
I'm not sure that I completely understand the question, but here's a try:
import scala.util.continuations._
class Future(thread: Thread) {
def await = thread.join
}
object Future {
def sleep(delay: Long) = Thread.sleep(delay)
def future[A,B](f: A => B) = (a: A) => shift { k: (Future => Unit) =>
val thread = new Thread { override def run() { f(a) } }
thread.start()
k(new Future(thread))
}
def main(args:Array[String]) = reset {
val sleepAsync = future(sleep)
val future1 = sleepAsync(2000) // returns right away
val future2 = sleepAsync(3000) // returns right away
future1.await // returns after two seconds
future2.await // returns after an additional one second
// finished after a total delay of three seconds
}
}
Here, a Future
instance is nothing more than a handle on a Thread
, so you can use its join
method to block until it finishes.
The future
function takes a function of type A => B
, and returns a function which, when supplied with an A
will kick off a thread to run the "futured" function, and wrap it up in a Future
, which is injected back into the continuation, thereby assigning it to val future1
.
Is this anywhere close to what you were going for?
精彩评论