Reading from multiple queues, RabbitMQ
I am new to RabbitMQ. I want to be able to handle reading messages without blocking when there are multiple queues (to read from). Any inputs on how I can do that?
//Edit 1
public class Rabbit : IMessageBus
{
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();
ConnectionFactory factory = null;
IConnection connection = null;
IModel channel = null;
Subscription sub = null;
public void writeMessage( Measurement m1 ) {
byte[] body = Measurement.AltSerialize( m1 );
int msgCount = 1;
Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id);
string finalQueue = publishToQueue( m1.id );
while (msgCount --> 0) {
channel.BasicPublish("amq.direct", finalQueue, null, body);
}
Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id);
}
public string publishToQueue(string firstQueueName) {
Console.WriteLine("Creating a queue and binding it to amq.direct");
string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null);
channel.QueueBind(queueName, "amq.direct", queueName, null);
Console.WriteLine("Done. Created queue {0} and bound it to amq.direct.\n", queueName);
return queueName;
}
public Measurement readMessage() {
Console.WriteLine("Receiving message...");
Measurement m = new Measurement();
int i = 0;
foreach (BasicDeliverEventArgs ev in sub) {
m = Measurement.AltDeSerialize(ev.Body);
//m.id = //get the id here, from sub
if (++i == 1)
break;
sub.Ack();
}
Console.WriteLine("Done.\n");
return m;
}
public void subscribeToQueue(string queueName )
{
sub = new Subscription(channel, queueName);
}
public static string MsgSysName;
public string MsgSys
{
get
{
return MsgSysName;
}
set
{
MsgSysName = value;
}
}
public Rabbit(string _msgSys) //Constructor
{
factory = new ConnectionFactory();
factory.HostName = "localhost";
connection = factory.CreateConnection();
channel = connection.CreateModel();
//consumer = new QueueingBasicConsumer(channel);
System.Console.WriteLine("\nMsgSys: RabbitMQ");
MsgSys = _msgSys;
}
~Rabbit()
{
//observer??
connection.Dispose();
//channel.Dispose();
System.Console.WriteLine("\nDestroying RABBIT");
}
}
//Edit 2
private List<Subscription> subscriptions = new List<Subscription>();
Subscription sub = null;
public Measurement readMessage()
{
Measurement m = new Measurement();
foreach(Subscription element in subscriptions)
{
foreach (BasicDeliverEventArgs ev in element) {
//ev = element.Next();
if( ev != null) {
m = Measurement.AltDeSerialize( ev.Body );
return m;
}
m = null;
}
}
System.Console.WriteLine("No message in the queue(s) at this time.");
return m;
}
public void subscribeToQueue(string queueName)
{
sub = new Subscription(channel, queueName);
subscriptions.Add(sub);
}
//Edit 3
//MessageHandler.cs
public class MessageHandler
{
// Implementation of methods for Rabbit class go here
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();
ConnectionFactory factory = null;
IConnection connection = null;
IModel channel = null;
QueueingBasicConsumer consumer = null;
private List<Subscription> subscriptions = new List<Subscription>();
Subscription sub = null;
public void writeMessage ( Measurement m1 )
{
byte[] body = Measurement.AltSerialize( m1 );
//declare a queue if it doesn't exist
publishToQueue(m1.id);
channel.BasicPublish("amq.direct", m1.id, null, body);
Console.WriteLine("\n [x] Sent to queue {0}.", m1.id);
}
public void publishToQueue(string queueName)
{
string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null);
channel.QueueBind(finalQueueName, "amq.direct", "", null);
}
public Measurement readMessage()
{
Measurement m = new Measurement();
foreach(Subscription element in subscriptions)
{
if( element.QueueName == null)
{
m = null;
}
else
{
BasicDeliverEventArgs ev = element.Next();
if( ev != null) {
m = Measurement.AltDeSerialize( ev.Body );
m.id = element.QueueName;
element.Ack();
return m;
}
m = null;
}
element.Ack();
}
System.Console.WriteLine("No message in the queue(s) at this time.");
return m;
}
public void subscribeToQueue(string queueName)
{
sub = new Subscription(channel, queueName);
subscriptions.Add(sub);
}
public static string MsgSysName;
public string MsgSys
{
get
{
return MsgSysName;
}
set
{
MsgSysName = value;
}
}
public MessageHandler(string _msgSys) //Constructor
{
factory = new ConnectionFactory();
factory.HostName = "localhost";
connection = factory.CreateConnection();
channel = connection.CreateModel();
consumer = new QueueingBasicConsumer(channel);
System.Console.WriteLine("\nMsgSys: RabbitMQ");
MsgSys = _msgSys;
}
public void disposeAll()
{
connection.Dispose();
channel.Dispose();
foreach(Subscription element in subscriptions)
{
element.Close();
}
System.Console.WriteLine("\nDestroying RABBIT");
}
}
//App1.cs
using System;
using System.IO;
using UtilityMeasurement;
using UtilityMessageBus;
public class MainClass
{
public static void Main()
{
MessageHandler obj1 = MessageHandler("Rabbit");
System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);
//Create new Measurement messages
Measurement m1 = new Measurement("q1", 2345, 23.456);
Measurement m2 = new Measurement("q2", 222, 33.33);
System.Console.WriteLine("Test message 1:\n ID: {0}", m1.id);
System.Console.WriteLine(" Time: {0}", m1.time);
System.Console.WriteLine(" Value: {0}", m1.value);
System.Console.WriteLine("Test message 2:\n ID: {0}", m2.id);
System.Console.WriteLine(" Time: {0}", m2.time);
System.Console.WriteLine(" Value: {0}", m2.value);
// Ask queue name and store it
System.Console.WriteLine("\nName of queue to publish to: ");
string queueName = (System.Console.ReadLine()).ToString();
obj1.publishToQueue( queueName );
// Write message to the queue
obj1.writeMessage( m1 );
System.Console.WriteLine("\nName of queue to publish to: ");
string queueName2 = (System.Console.ReadLine()).ToString();
obj1.publishToQueue( queueName2 );
obj1.writeMessage( m2 );
obj1.disposeAll();
}
}
//App2.cs
using System;
using System.IO;
using Ut开发者_Python百科ilityMeasurement;
using UtilityMessageBus;
public class MainClass
{
public static void Main()
{
//Asks for the message system
System.Console.WriteLine("\nEnter name of messageing system: ");
System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
string MsgSysName = (System.Console.ReadLine()).ToString();
//Declare an IMessageBus instance:
//Here, an object of the corresponding Message System
// (ex. Rabbit, Zmq, etc) is instantiated
IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);
System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);
//Create a new Measurement object m
Measurement m = new Measurement();
System.Console.WriteLine("Queue name to subscribe to: ");
string QueueName1 = (System.Console.ReadLine()).ToString();
obj1.subscribeToQueue( QueueName1 );
//Read message into m
m = obj1.readMessage();
if (m != null ) {
System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id);
System.Console.WriteLine(" Time: {0}", m.time);
System.Console.WriteLine(" Value: {0}", m.value);
}
System.Console.WriteLine("Another queue name to subscribe to: ");
string QueueName2 = (System.Console.ReadLine()).ToString();
obj1.subscribeToQueue( QueueName2 );
m = obj1.readMessage();
if (m != null ) {
System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id);
System.Console.WriteLine(" Time: {0}", m.time);
System.Console.WriteLine(" Value: {0}", m.value);
}
obj1.disposeAll();
}
}
two sources of info:
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
You should really try to understand the examples first.
%Program Files%\RabbitMQ\DotNetClient\examples\src (basic examples)
get full working examples from their Mercurial repository (c# projects).
Useful operations to understand:
- Declare / Assert / Listen / Subscribe / Publish
Re: your question -- there's no reason why you can't have multiple listenners. Or you could subscribe to n routing paths with one listenner on an "exchange".
** re: non-blocking **
A typical listenner consumes messages one at a time. You can pull them off the queue, or they will automatically be placed close to the consumer in a 'windowed' fashion (defined through quality of service qos parameters). The beauty of the approach is that a lot of hard work is done for you (re: reliability, guaranteed delivery, etc.).
A key feature of RabbitMQ, is that if there is an error in processing, then the message is re-added back into the queue (a fault tolerance feature).
Need to know more about you situation.
Often if you post to the list I mentioned above, you can get hold of someone on staff at RabbitMQ. They're very helpful.
Hope that helps a little. It's a lot to get your head around at first, but it is worth persisting with.
Q&A
see: http://www.rabbitmq.com/faq.html
Q. Can you subscribe to multiple queues using new Subscription(channel, queueName) ?
Yes. You either use a binding key e.g. abc.*.hij, or abc.#.hij, or you attach multiple bindings. The former assumes that you have designed your routing keys around some kind of principle that makes sense for you (see routing keys in the FAQ). For the latter, you need to bind to more than one queue.
Implementing n-bindings manually. see: http://hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs
there's not much code behind this pattern, so you could roll your own subscription pattern if wildcards are not enough. you could inherit from this class and add another method for additional bindings... probably this will work or something close to this (untested).
The AQMP spec says that multiple manual binding are possible: http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind
Q. And if so, how can I go through all the subscribed queues and return a message (null when no messages)?
With a subscriber you are notified when a message is available. Otherwise what you are describing is a pull interface where you pull the message down on request. If no messages available, you'll get a null as you'd like. btw: the Notify method is probably more convenient.
Q. Oh, and mind you that that I have all this operations in different methods. I will edit my post to reflect the code
Live code:
this version must use wild cards to subscribe to more than one routing key
n manual routing keys using subscription is left as an exercise for the reader. ;-) I think you were leaning towards a pull interface anyway. btw: pull interfaces are less efficient than notify ones.
using (Subscription sub = new Subscription(ch, QueueNme))
{
foreach (BasicDeliverEventArgs ev in sub)
{
Process(ev.Body);
...
Note: the foreach uses IEnumerable, and IEnumerable wraps the event that a new message has arrived through the "yield" statement. Effectively it is an infinite loop.
--- UPDATE
AMQP was designed with the idea of keeping the number of TCP connections as low as the number of applications, so that means you can have many channels per connection.
the code in this question (edit 3) tries to use two subscribers with one channel, whereas it should (I believe), be one subscriber per channel per thread to avoid locking issues. Sugestion: use a routing key "wildcard". It is possible to subscribe to more than one distinct queue names with the java client, but the .net client does not to my knowledge have this implemented in the Subscriber helper class.
If you really do need two distinct queue names on the same subscription thread, then the following pull sequence is suggested for .net:
using (IModel ch = conn.CreateModel()) { // btw: no reason to close the channel afterwards IMO
conn.AutoClose = true; // no reason to closs the connection either. Here for completeness.
ch.QueueDeclare(queueName);
BasicGetResult result = ch.BasicGet(queueName, false);
if (result == null) {
Console.WriteLine("No message available.");
} else {
ch.BasicAck(result.DeliveryTag, false);
Console.WriteLine("Message:");
}
return 0;
}
-- UPDATE 2:
from RabbitMQ list:
"assume that element.Next() is blocking on one of the subscriptions. You could retrieve deliveries from each subscription with a timeout to read past it. Alternatively you could set up a single queue to receive all measurements and retrieve messages from it with a single subscription." (Emile)
What that means is that when the first queue is empty, .Next() blocks waiting for the next message to appear. i.e. the subscriber has a wait-for-next-message built in.
-- UPDATE 3:
under .net, use the QueueingBasicConsumer for consumption from multiple queues.
Actually here's a thread about it to get a feel on usage:
Wait for a single RabbitMQ message with a timeout
-- UPDATE4:
some more info on the .QueueingBasicConsumer
There's example code here.
http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html
example copied into the answer with a few modifications (see //<-----).
IModel channel = ...;
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, false, null, consumer); //<-----
channel.BasicConsume(queueName2, false, null, consumer); //<-----
// etc. channel.BasicConsume(queueNameN, false, null, consumer); //<-----
// At this point, messages will be being asynchronously delivered,
// and will be queueing up in consumer.Queue.
while (true) {
try {
BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
// ... handle the delivery ...
channel.BasicAck(e.DeliveryTag, false);
} catch (EndOfStreamException ex) {
// The consumer was cancelled, the model closed, or the
// connection went away.
break;
}
}
-- UPDATE 5 : a simple get that will act on any queue (a slower, but sometimes more convenient method).
ch.QueueDeclare(queueName);
BasicGetResult result = ch.BasicGet(queueName, false);
if (result == null) {
Console.WriteLine("No message available.");
} else {
ch.BasicAck(result.DeliveryTag, false);
Console.WriteLine("Message:");
// deserialize body and display extra info here.
}
The easiest way is to use the EventingBasicConsumer. I have an example on my site on how to use it. RabbitMQ EventingBasicConsumer
This Consumer class exposes a Received Event you can use, and therefore does NOT block. The rest of the code basically stays the same.
精彩评论