Wait for a single RabbitMQ message with a timeout
I'd like to send a message to a RabbitMQ server and then wait for a reply message (on a "reply-to" queue). Of course, I don't want to wait forever in case the application processing these messages is down - there needs to be a timeout. It sounds like a very basic task, yet I can't find a way to do this. I've now run into this problem with both py-amqplib and the RabbitMQ .NET client.
The best solution I've got so far is to poll using basic_get
with sleep
in-between, but this is pretty ugly:
def _wait_for_message_with_timeout(channel, queue_name, timeout):
slept = 0
sleep_interval = 0.1
while slept < timeout:
reply = channel.basic_get(queue_name)
if rep开发者_如何学JAVAly is not None:
return reply
time.sleep(sleep_interval)
slept += sleep_interval
raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)
Surely there is some better way?
Here's what I ended up doing in the .NET client:
protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs)
{
var consumer = new QueueingBasicConsumer(Channel);
var tag = Channel.BasicConsume(queueName, true, null, consumer);
try
{
object result;
if (!consumer.Queue.Dequeue(timeoutMs, out result))
throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0));
return ((BasicDeliverEventArgs)result).Body;
}
finally
{
Channel.BasicCancel(tag);
}
}
Unfortunately, I cannot do the same with py-amqplib, because its basic_consume
method does not call the callback unless you call channel.wait()
and channel.wait()
doesn't support timeouts! This silly limitation (which I keep running into) means that if you never receive another message your thread is frozen forever.
I just added timeout support for amqplib
in carrot
.
This is a subclass of amqplib.client0_8.Connection
:
http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97
wait_multi
is a version of channel.wait
able to receive on an arbitrary number
of channels.
I guess this could be merged upstream at some point.
There's an example here using qpid with a msg = q.get(timeout=1)
that should do what you want. Sorry, I don't know what other AMQP client libraries implement timeouts (and in particular I don't know the two specific ones you mentioned).
This seems to break the whole idea of asynchronous processing, but if you must I think the right way to do it is to use an RpcClient.
Rabbit now allows for you to add timeout events. Simply wrap your code in a try catch and then throw exceptions in the TimeOut and Disconnect handlers:
try{
using (IModel channel = rabbitConnection.connection.CreateModel())
{
client = new SimpleRpcClient(channel, "", "", queue);
client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity
client.TimedOut += RpcTimedOutHandler;
client.Disconnected += RpcDisconnectedHandler;
byte[] replyMessageBytes = client.Call(message);
return replyMessageBytes;
}
}
catch (Exception){
//Handle timeout and disconnect here
}
private void RpcDisconnectedHandler(object sender, EventArgs e)
{
throw new Exception("RPC disconnect exception occured.");
}
private void RpcTimedOutHandler(object sender, EventArgs e)
{
throw new Exception("RPC timeout exception occured.");
}
精彩评论