开发者

some Scala Actor goes into waiting state when 8-10 actors running simultaneously

In my model there are about 8-9 Scala Actors. Each actor has its own queue on RabbitMQ Server

in act method of each Actor .It is continuously listing to the queue like

def act {
    this ! 1
    loop {
      react {
        case 1 => processMessage(QManager.getMessage); this ! 1
      }
    }
  } 

I a rabbitMq QManager getMessage Method

def getMessage: MyObject = {
    getConnection
    val durable = true
    channel.exchangeDeclare(EXCHANGE, "direct", durable)
    channel.queueDeclare(QUEUE, durable, false, false, null)
    channel queueBind (QUEUE, EXCHANGE, _ROUTING_KEY)
    consumer = new QueueingConsumer(channel)
    channel basicConsume (QUEUE, false, consumer)

    var obj = new MyObject
    try {
      val delivery = consumer.nextDelivery
      val msg = new java.io.ObjectInputStream(
        new java.io.ByteArrayInputStream(delivery.getBody)).readObject()
      obj = msg.asInstanceOf[MyObject]
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
    } catch {
      case e: Exception =>logger.error("error in Get Message", e);endConnection
    }
    endConnection
    obj
  }

All 9 Actors has its own object type and a own QManager

in a GetMessage I am using Rabbitmq QueueConsumer

 val delivery = consumer.nextDelivery

and the nextDelivery method returns a object 开发者_Python百科when itfounds in a queue this method puts actor in waiting state

when i start all 8 actors only 4 of them works fine other are not stated. I have test each and every actor running interdependently they works fine when started Alone

The problem occurs when i start more that 4 actors

is therer any Problem with threading of scala actors.


Disclaimer: I am the PO of Akka

As Rex says, you're busy-waiting, hogging threads, on a shared pool of threads.

I don't know if you have the option to test Akka, but we have support for AMQP consumers (and producers) as actors: Akka-AMQP

Producing AMQP messages:

    val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
    val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters), producerId = Some("my_producer"))
producer ! Message("Some simple sting data".getBytes, "some.routing.key")

Consuming AMQP messages:

val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val myConsumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing.key", actorOf(new Actor { def receive = {
  case Delivery(payload, _, _, _, _, _) => log.info("Received delivery: %s", new String(payload))
}}), None, Some(exchangeParameters)))

Another option is to use Akka-Camel to consume and produce AMQP messages with actors


All your actors are running all the time; they never take a break. Since actors are shared across a common pool of threads, this means that the lucky winner actors run all the time and the unlucky losers never get any time at all. If you want to have an entity that takes an entire thread for itself all the time, it's generally better to use a java Thread, or at least to use receive instead of react. You could also increase the size of the actor pool to match the number of actors, but generally if you have a very large number of actors all of which run all the time, you should think more carefully about how you're structuring your program.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜