some Scala Actor goes into waiting state when 8-10 actors running simultaneously
In my model there are about 8-9 Scala Actors. Each actor has its own queue on RabbitMQ Server
in act method of each Actor .It is continuously listing to the queue like
def act {
this ! 1
loop {
react {
case 1 => processMessage(QManager.getMessage); this ! 1
}
}
}
I a rabbitMq QManager getMessage Method
def getMessage: MyObject = {
getConnection
val durable = true
channel.exchangeDeclare(EXCHANGE, "direct", durable)
channel.queueDeclare(QUEUE, durable, false, false, null)
channel queueBind (QUEUE, EXCHANGE, _ROUTING_KEY)
consumer = new QueueingConsumer(channel)
channel basicConsume (QUEUE, false, consumer)
var obj = new MyObject
try {
val delivery = consumer.nextDelivery
val msg = new java.io.ObjectInputStream(
new java.io.ByteArrayInputStream(delivery.getBody)).readObject()
obj = msg.asInstanceOf[MyObject]
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
} catch {
case e: Exception =>logger.error("error in Get Message", e);endConnection
}
endConnection
obj
}
All 9 Actors has its own object type and a own QManager
in a GetMessage I am using Rabbitmq QueueConsumer
val delivery = consumer.nextDelivery
and the nextDelivery method returns a object 开发者_Python百科when itfounds in a queue this method puts actor in waiting state
when i start all 8 actors only 4 of them works fine other are not stated. I have test each and every actor running interdependently they works fine when started Alone
The problem occurs when i start more that 4 actors
is therer any Problem with threading of scala actors.
Disclaimer: I am the PO of Akka
As Rex says, you're busy-waiting, hogging threads, on a shared pool of threads.
I don't know if you have the option to test Akka, but we have support for AMQP consumers (and producers) as actors: Akka-AMQP
Producing AMQP messages:
val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters), producerId = Some("my_producer"))
producer ! Message("Some simple sting data".getBytes, "some.routing.key")
Consuming AMQP messages:
val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val myConsumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing.key", actorOf(new Actor { def receive = {
case Delivery(payload, _, _, _, _, _) => log.info("Received delivery: %s", new String(payload))
}}), None, Some(exchangeParameters)))
Another option is to use Akka-Camel to consume and produce AMQP messages with actors
All your actors are running all the time; they never take a break. Since actors are shared across a common pool of threads, this means that the lucky winner actors run all the time and the unlucky losers never get any time at all. If you want to have an entity that takes an entire thread for itself all the time, it's generally better to use a java Thread
, or at least to use receive
instead of react
. You could also increase the size of the actor pool to match the number of actors, but generally if you have a very large number of actors all of which run all the time, you should think more carefully about how you're structuring your program.
精彩评论