How to get SimpleRpcClient.Call() to be a blocking call to achieve synchronous communication with RabbitMQ?
In the .NET version (2.4.1) of RabbitMQ the RabbitMQ.Client.MessagePatterns.SimpleRpcClient has a Call() method with these signatures:
public virtual object[] Call(params object[] args);
public virtual byte[] Call(byte[] body);
public virtual byte[] Call(IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties);
The problem:
With various attempts, the method still continues to not block where I expect it to, so it's unable ever handle the response.
The Question:
Am I missing something obvious in the setup of the SimpleRpcClient, or earlier with the IModel, IConnect开发者_开发百科ion, or even PublicationAddress?
More Info:
I've also tried various paramater configurations of the QueueDeclare() method too with no luck.
string QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments);
Some more reference code of my setup of these:
IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection());
using (IModel ch = conn.CreateModel())
{
var client = new SimpleRpcClient(ch, queueName);
var queueName = ch.QueueDeclare("t.qid", true, true, true, null);
ch.QueueBind(queueName, "exch", "", null);
//HERE: does not block?
var replyMessageBytes = client.Call(prop, msgToSend, out replyProp);
}
Looking elsewhere:
Or is it likely there's an issue in my "server side" code? With and without the use of BasicAck() it appears the client has already continued execution.
--SHORT ANSWER--
Bit of the "You're doing it wrong"...
Check on IBasicProperties, and you should be using SimpleRpcServer with HandleSimpleCall()
If you stumble upon this question, you've either taken the wrong approach as I have, and possibly if made a similar mistake of manipulating IBasicProperties incorectly thereby hurting the ability of SimpleRpcServer to function correctly.
--LONG ANSWER--
Working Sample for .NET: Find my working example up on BitBucket here:
https://bitbucket.org/NickJosevski/synchronous-rabbitmq-sample-.net
Or here's a quick sample...
Client side:
IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection();
using (IModel ch = conn.CreateModel())
{
ch.ExchangeDeclare(Helper.ExchangeName, "direct");
var queueName = ch.EnsureQueue();
var client = new SimpleRpcClient(ch, queueName);
var msgToSend = new Message(/*data*/).Serialize();
IBasicProperties replyProp;
var reply = client.Call(new BasicProperties(), msgToSend, out replyProp);
}
Server side:
IConnection conn = new ConnectionFactory{Address = "127.0.0.1"}.CreateConnection();
using (IModel ch = conn.CreateModel())
{
ch.ExchangeDeclare(Helper.ExchangeName, "direct");
var queuename = ch.EnsureQueue();
var subscription = new Subscription(ch, queuename);
new MySimpleRpcServerSubclass(subscription).MainLoop();
}
internal class MySimpleRpcServerSubclass : SimpleRpcServer
{
public MySimpleRpcServerSubclass(Subscription subscription)
: base(subscription) { }
public override byte[] HandleSimpleCall(
bool isRedelivered, IBasicProperties requestProperties,
byte[] body, out IBasicProperties replyProperties)
{
replyProperties = requestProperties;
replyProperties.MessageId = Guid.NewGuid().ToString();
var m = Message.Deserialize(body);
var r =
new Response
{
Message = String.Format("Got {0} with {1}", m.Name, m.Body)
};
return r.Serialize();
}
}
Shared:
//helper:
public static string EnsureQueue(this IModel ch)
{
var queueName = ch.QueueDeclare(QueueId, false, false, false, null);
ch.QueueBind(queueName, ExchangeName, "", null);
return queueName;
}
//NOTE:
not all extension methods are explained here, such as *.Serialize()*
as they're not relevant and just make for a cleaner example.
精彩评论