Dependency between operations in scala actors
I am trying to parallelise a code using scala actors. That is my first real code with actors, but I have some experience with Java Mulithreading and MPI in C. However I am completely lost.
The workflow I want to realise is a circular pipeline and can be described as the following:
- Each worker actor has a reference to another one, thus forming a circle
- There is a coordinator actor which can trigger a computation by sending a
StartWork()
message - When a worker receives a
StartWork()
message, it process some stuff locally and sendsDoWork(...)
message to its neighbour in the circle. - The neighbours do some other stuff and sends in turn a
DoWork(...)
message to its own neighbour. - This continues until the initial worker receives a
DoWork()
message. - The coordinator can send a
GetResult()
message to the initial worker and wait for a reply.
The point is that the coordinator should only receive a result when data is ready. How can a worker wait that the job returned to it before answering the GetResult()
message ?
T开发者_运维知识库o speed up computation, any worker can receive a StartWork()
at any time.
Here is my first try pseudo-implementation of the worker:
class Worker( neighbor: Worker, numWorkers: Int ) {
var ready = Foo()
def act() {
case StartWork() => {
val someData = doStuff()
neighbor ! DoWork( someData, numWorkers-1 )
}
case DoWork( resultData, remaining ) => if( remaining == 0 ) {
ready = resultData
} else {
val someOtherData = doOtherStuff( resultData )
neighbor ! DoWork( someOtherData, remaining-1 )
}
case GetResult() => reply( ready )
}
}
On the coordinator side:
worker ! StartWork()
val result = worker !? GetResult() // should wait
Firstly, you clearly need to have some identifier of what constitutes a single piece of work, so that the GetResult
can get the correct result. I guess the obvious solution is to have your actors keep a Map
of the results and a Map
of any waiting getters:
class Worker( neighbor: Worker, numWorkers: Int ) {
var res: Map[Long, Result] = Map.empty
var gets: Map[Long, OutputChannel[Any]] = Map.empty
def act() {
...
case DoWork( id, resultData, remaining ) if remaining == 0 =>
res += (id -> resultData)
gets.get(id).foreach(_ ! res(id)) //reply to getters when result is ready
gets -= id //clear out getter map now?
case GetResult(id) if res.isDefinedAt(d) => //result is ready
reply (res(id))
case GetResult(id) => //no result ready
gets += (id -> sender)
}
}
Note: the use of if
in the matching condition can make message processing a bit clearer
One alternative would be this:
class Worker( neighbor: Worker, numWorkers: Int ) {
var ready = Foo()
def act() {
case StartWork() => {
val someData = doStuff()
neighbor ! DoWork( someData, numWorkers-1 )
}
case DoWork( resultData, remaining ) => if( remaining == 0 ) {
ready = resultData
react {
case GetResult() => reply( ready )
}
} else {
val someOtherData = doOtherStuff( resultData )
neighbor ! DoWork( someOtherData, remaining-1 )
}
}
}
After the work has finished, this worker will be stuck until it receives the GetResult
message. On the other hand, the coordinator can immediately send the GetResult
, as it will remain in the mailbox until the worker receives it.
精彩评论