开发者

How can I make this probably simple Producer actor example working?

I was trying to write a simple producer class, in order to learn Actors. I wanted to have a producer, which starts with some directory, represented by a File object, and then sends messages to other actors in order to process the files. Initially, I was reading the contents of the files, but, for simplicity's sake, now I'm just collecting their paths. Once again, this has no real world value, but it has practical value to me, as I think this will allow me to understand the actors better. Here's what I have so far:

import java.io._
import java.util.concurrent._
import scala.actors.Actor
import scala.io.Source
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong

case class FileSystemObject(path:File)
case class FileContent(content:String)

case object Stop
case object Processed

class ResultAcumulator extends Actor {

    var results:List[String] = Nil
    var finished = false

    def act() = {
        loop {
            react {
                case FileContent(content) => {
                    results ::= content
                }
                case Stop => {
                    finished = true;
                    exit
                }
            }
        }
    }
}

class FileSystemReader(accumulator:Actor) extends Actor {
    def act() = {
        loop {
            react {
                case FileSystemObject(path) => {
                    if(path.isFile) {
                        accumulator ! FileContent(path.toString)
                        sender ! Processed
                    }
                }
                case Stop => exit
            }
        }
    }
}

class FileSystemProducer(start:File,acumulator:Actor,reader:Actor) extends Actor {
    var totalFilesProcessed = 0

    def act() = {
        val files = start.listFiles
        files.foreach{ f =>
            (reader ! FileSystemObject(f))
        }
        loop {
            react {
                case Processed => {
    开发者_Python百科                totalFilesProcessed += 1
                    if(totalFilesProcessed == files.length) {
                        reader ! Stop
                        acumulator ! Stop
                        Xo.decrementLatch
                    }
                }
            }
        }

    }
}
object Xo {

     var latch = new CountDownLatch(1)

     def decrementLatch = latch.countDown

     def main(args : Array[String]) = {

         val acumulator = new ResultAcumulator
         val fsReader = new FileSystemReader(acumulator)
         val producer = new FileSystemProducer(new File("d:/rails/a"),acumulator,fsReader)

         acumulator.start
         fsReader.start
         producer.start

         latch.await

         acumulator.results.foreach(println)

     }
}

In the state it is now, it runs forever, and I see no output. Ah, one more thing. Before the program exits, I would like it to list the results "processed". I searched around a little and found the CountDownLatch class. I would like to keep this implemented with a loop/react instead of a while/receive. I am pretty sure the problem is caused by the fact that I have these lines:

files.foreach{ f =>
    (reader ! FileSystemObject(f))
}

and that I have the react loop a little lower, but I have no clue how to go about fixing it.


I guess that the relevant part is case FileSystemObject(path) => { if(path.isFile) { accumulator ! FileContent(path.toString) sender ! Processed } } Here things that are not "normal Files", e.g., directories, are not sent to the accumulator. So if there subdirectories in your "d:/rails/a" the test totalFilesProcessed == files.length will always fail.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜