开发者

scala actors: long running io operations

I have quite some trouble with actors which contain long running operations, in my case persistent socket connections. Here is some test code which runs fine if I create less than four Server instances, but if I create more instances, I always end up with only three or sometimes four concurrent socket connections, because the other ones time out. I wonder why that is and whether there is something obviously wrong with my code.

package test

import actors.Actor
import actors.Actor._
import java.io.{PrintStream, DataOutputStream, DataInputStream}
import java.net.{Socket, InetAddress}
import java.text.{SimpleDateFormat}
import java.util.{Calendar}

case class SInput(input: String)
case class SOutput(output: String)
case class SClose
case class SRepeat

import scala.xml._

class Config(xml: Node) {
  var nick: String = (xml \ "nick").text
  var realName: String = (xml \ "realName").text
  var server: String = (xml \ "ip").text
  var port: Int = (xml \ "port").text.toInt
  var identPass: String = (xml \ "identPass").text
  var joinChannels: List[String] = List.fromString((xml \ "join").text.trim, ' ')
}

object ServerStarter {
  def main(args: Array[String]): Unit = {
    var servers = List[Server]()

    val a = actor {
      loop {
        receive {
          case config: Config =>
            actor {
              val server = new Server(config)
              servers = server :: servers
              server.start
            }
        }
      }
    }

    val xml = XML.loadFile("config.xml")
    (xml \ "server").elements.foreach(config => a ! new Config(config))
  }
}


class Server(config: Config) extends Actor {
  private var auth = false
  private val socket = new Socket(InetAddress.getByName(config.server), config.port)
  private val out = new PrintStream(new DataOutputStream(socket.getOutputStream()))
  private val in = new DataInputStream(socket.getInputStream())

  def act = {
    val _self = this
    _self ! SRepeat

    while (true) {
      receive {
        case SRepeat =>
          try {
            val input = in.readLine
            if (input != null) {
              actor {_self ! SInput(input)}
            } else {
              actor {_self ! SClose}
            }
          } catch {
            case e: Exception =>
              println(e)
              actor {_self ! SClose}
          }

        case SClose =>
          println(getDate + " closing: " + config.server + " mail: " + mailboxSize)
          try {
            socket.close
            in.close
            out.close
          } catch {
            case e: Exception =>
              println(e)
          }

        case SInput(input: String) =>
          println(getDate + " " + config.server + " IN => " + input + " mail: " + mailboxSize)
          actor {onServerInput(_self, input)}
          _self ! SRepeat

        case SOutput(output: String) =>
          println(getDate + " " + config.server + " OUT => " + output + " mail: " + mailboxSize)
          actor {
            out.println(output)
            out.flush()
          }

        case x =>
          println("unmatched: " + x + " mail: " + mailboxSize)
      }
    }
  }

  private def getDate = {
    new SimpleDateFormat("hh:mm:ss").format(Calendar.getInstance().getTime());
  }

  def onServerInput(a: Actor, input: String) = {
    if (!auth) {
      authenticate(a)
    }
    else if (input.contains("MOTD")) {
      identify(a)
      join开发者_如何学运维(a)
    }
    else if (input.contains("PING")) {
      pong(a, input)
    } else {
    }
  }

  def authenticate(a: Actor) = {
    a ! SOutput("NICK " + config.nick)
    a ! SOutput("USER " + config.nick + " 0 0 : " + config.realName)
    auth = true
  }

  def pong(a: Actor, input: String) = {
    a ! SOutput("PONG " + input.split(":").last)
  }

  def identify(a: Actor) = {
    if (config.identPass != "") {
      a ! SOutput("nickserv :identify " + config.nick + " " + config.identPass)
    }
  }

  def join(a: Actor) = {
    config.joinChannels.foreach(channel => a ! SOutput("JOIN " + channel))
  }
}

btw. I am using scala 2.7.6 final.


There are strange things here. For example:

actor { 
  val server = new Server(config) 
  servers = server :: servers 
  server.start 
}

Or also:

actor {_self ! SClose}  

The method actor is an Actor factory. In the first case, for example, you are creating an actor which will create another actor (because Server is an Actor), and start it.

Let me repeat that: everything between actor { and } is an actor. Inside that actor, you are doing new Server, which creates another actor. And that is inside a receive, which, of course, is part of an actor. So, inside an actor, you are creating an actor which will create an actor.

And on the second example, you are creating an actor just to send a message to oneself. That makes no sense to me, but I'm not all that experienced with actors.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜