开发者

RabbitMQ: setReturnListner handleBasicReturn nt getting called for undelivered messages

For one of the requirement we need to keep track of queue depth and successfully processed messages. The idea is to publish messages and get a list of successful and failed messages. To simulate the requirement I did the following

  1. Publish the messages with Mandatory and Immediate flag sent channel.basicPublish 'exchange' ,'rKey',true,false, props,"Hello World".bytes
  2. The consumer consumes even marked ( I have put numbers from 1..10 as marked value in header of each messages) and does not ACKS odd numbered messages.
  3. I have implemented setReturnListnere in the publisher to capture undelivered messages.

While am able to get the number of unack messages via Rabbmitmqctl list_queues messages_unacknowledged, somehow my handleBasicReturn method does not gets called. Am in missing something.

Code snippets:

Producer:

channel.setReturnListener(new ReturnListener() {
    public void handleBasicReturn(int replyCode, String replyText, String exchange,
    开发者_开发技巧                              String routingKey, AMQP.BasicProperties properties,
                                  byte[] body) 
            throws IOException {
        println "Debugging messages!!!!"
        println "The details of returned messages are ${replyText} from  ${exchange} with routingKey as ${routingKey} with properties"
    }
});

println " queuename is ${dec.queue} and consumerCount is ${dec.consumerCount} messageCount is ${dec.messageCount}"
(1..10).each {
    println "Sending file ${i}....."
    def headers = new HashMap<String,Object>()
    headers.put "operatiion","scp"
    headers.put "dest","joker.dk.mach.com"
    headers.put "id", i
    println headers

    BasicProperties props = new BasicProperties(null, null, headers, null, null, null, null, null,null, null, null, null,null, null)
                    channel.basicPublish 'exchange' ,'rKey',true,false, props,"Hello Worls".bytes                                
    i++                         
}
channel.close()

Consumer:

while (true) {
    def delivery = consumer.nextDelivery()
    def headers = delivery?.properties?.headers
    def id = headers.get("id")
    println "Received message:"
    println " ${id.toString()}"

    if( id % 2 == 0){
        channel.basicAck delivery.envelope.deliveryTag, false                
    }    
}


Read this explanation about how the immediate and mandatory flags impact message delivery in RabbitMQ.

In your case, since you have a consumer receiving messages, messages won't be returned even if the consumer never acks them.

D.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜