How can I make this probably simple Producer actor example working?
I was trying to write a simple producer class, in order to learn Actors. I wanted to have a producer, which starts with some directory, represented by a File object, and then sends messages to other actors in order to process the files. Initially, I was reading the contents of the files, but, for simplicity's sake, now I'm just collecting their paths. Once again, this has no real world value, but it has practical value to me, as I think this will allow me to understand the actors better. Here's what I have so far:
import java.io._
import java.util.concurrent._
import scala.actors.Actor
import scala.io.Source
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong
case class FileSystemObject(path:File)
case class FileContent(content:String)
case object Stop
case object Processed
class ResultAcumulator extends Actor {
var results:List[String] = Nil
var finished = false
def act() = {
loop {
react {
case FileContent(content) => {
results ::= content
}
case Stop => {
finished = true;
exit
}
}
}
}
}
class FileSystemReader(accumulator:Actor) extends Actor {
def act() = {
loop {
react {
case FileSystemObject(path) => {
if(path.isFile) {
accumulator ! FileContent(path.toString)
sender ! Processed
}
}
case Stop => exit
}
}
}
}
class FileSystemProducer(start:File,acumulator:Actor,reader:Actor) extends Actor {
var totalFilesProcessed = 0
def act() = {
val files = start.listFiles
files.foreach{ f =>
(reader ! FileSystemObject(f))
}
loop {
react {
case Processed => {
开发者_Python百科 totalFilesProcessed += 1
if(totalFilesProcessed == files.length) {
reader ! Stop
acumulator ! Stop
Xo.decrementLatch
}
}
}
}
}
}
object Xo {
var latch = new CountDownLatch(1)
def decrementLatch = latch.countDown
def main(args : Array[String]) = {
val acumulator = new ResultAcumulator
val fsReader = new FileSystemReader(acumulator)
val producer = new FileSystemProducer(new File("d:/rails/a"),acumulator,fsReader)
acumulator.start
fsReader.start
producer.start
latch.await
acumulator.results.foreach(println)
}
}
In the state it is now, it runs forever, and I see no output. Ah, one more thing. Before the program exits, I would like it to list the results "processed". I searched around a little and found the CountDownLatch class. I would like to keep this implemented with a loop/react instead of a while/receive. I am pretty sure the problem is caused by the fact that I have these lines:
files.foreach{ f =>
(reader ! FileSystemObject(f))
}
and that I have the react loop a little lower, but I have no clue how to go about fixing it.
I guess that the relevant part is
case FileSystemObject(path) => {
if(path.isFile) {
accumulator ! FileContent(path.toString)
sender ! Processed
}
}
Here things that are not "normal Files", e.g., directories, are not sent to the accumulator.
So if there subdirectories in your "d:/rails/a" the test totalFilesProcessed == files.length
will always fail.
精彩评论