How to wait for messages on multiple queues using py-amqplib
I'm using py-amqplib to access RabbitMQ in Python. The application receives requests to listen on certain MQ topics from time to time.
The first time it receives such a request it creates an AMQP connection and a channel and starts a new thread to listen for messages:
connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
channel = connection.channel()
listener = AMQPListener(channel)
listener.start()
AMQPListener is very simple:
class AMQPListener(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.__channel = channel
def run(self):
while T开发者_Python百科rue:
self.__channel.wait()
After creating the connection it subscribes to the topic of interest, like this:
channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)
def receive_callback(msg):
self.queue.put(msg.body)
channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)
The first time this all works fine. However, it fails on a subsequent request to subscribe to another topic. On subsequent requests I re-use the AMQP connection and AMQPListener thread (since I don't want to start a new thread for each topic) and when I call the code block above the channel.queue_declare() method call never returns. I've also tried creating a new channel at that point and the connection.channel() call never returns, either.
The only way I've been able to get it to work is to create a new connection, channel and listener thread per topic (ie. routing_key), but this is really not ideal. I suspect it's the wait() method that's somehow blocking the entire connection, but I'm not sure what to do about it. Surely I should be able to receive messages with several routing keys (or even on several channels) using a single listener thread?
A related question is: how do I stop the listener thread when that topic is no longer of interest? The channel.wait() call appears to block forever if there are no messages. The only way I can think of is to send a dummy message to the queue that would "poison" it, ie. be interpreted by the listener as a signal to stop.
If you want more than one comsumer per channel just attach another one using basic_consume() and use channel.wait() after. It will listen to all queues attached via basic_consume(). Make sure you define different consumer tags for each basic_consume().
Use channel.basic_cancel(consumer_tag) if you want to cancel a specific consumer on a queue (cancelling listen to a specific topic).
精彩评论