开发者

How can I use Pika to send and receive RabbitMQ messages?

I'm having some issue getting Pika to work with routing keys or exchanges in a way that's consistent with it AMQP or RabbitMQ documentation. I understand that the RabbitMQ documentation uses an older version of Pika, so I have disregarded their example code.

What I'm trying to do is define a queue, "order" and have two consumers, one that handle th开发者_运维问答e exchange or routing_key "production" and one that handles "test". From looking at that RabbitMQ documentation that should be easy enough to do by using either a direct exchange and routing keys or by using a topic exchange.

Pika however doesn't appear to know what to do with the exchanges and routing keys. Using the RabbitMQ management tool to inspect the queues, it's pretty obvious that Pika either didn't queue the message correctly or that RabbitMQ just threw it away.

On the consumer side it isn't really clear how I should bind a consumer to an exchange or handle routing keys and the documentation isn't really helping.

If I drop all ideas or exchanges and routing keys, messages queue up nicely and are easily handled by my consumer.

Any pointers or example code people have would be nice.


As it turns out, my understanding of AMQP was incomplete.

The idea is as following:

Client:

The client after getting the connection should not care about anything else but the name of the exchange and the routing key. That is we don't know which queue this will end up in.

channel.basic_publish(exchange='order',
                      routing_key="order.test.customer",
                      body=pickle.dumps(data),
                      properties=pika.BasicProperties(
                          content_type="text/plain",
                          delivery_mode=2))

Consumer

When the channel is open, we declare the exchange and queue

channel.exchange_declare(exchange='order', 
                         type="topic", 
                         durable=True, 
                         auto_delete=False)

channel.queue_declare(queue="test", 
                      durable=True, 
                      exclusive=False, 
                      auto_delete=False, 
                      callback=on_queue_declared)

When the queue is ready, in the "on_queue_declared" callback is a good place, we can bind the queue to the exchange, using our desired routing key.

channel.queue_bind(queue='test', 
                   exchange='order', 
                   routing_key='order.test.customer')

#handle_delivery is the callback that will actually pickup and handle messages
#from the "test" queue
channel.basic_consume(handle_delivery, queue='test') 

Messages send to the "order" exchange with the routing key "order.test.customer" will now be routed to the "test" queue, where the consumer can pick it up.


While Simon's answer seems right in general, you might need to swap the parameters for consuming

channel.basic_consume(queue='test', on_message_callback=handle_delivery) 

Basic setup is sth like

credentials = pika.PlainCredentials("some_user", "some_password")
parameters = pika.ConnectionParameters(
    "some_host.domain.tld", 5672, "some_vhost", credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

To start consuming:

channel.start_consuming()
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜