RabbitMQ and Serialization weird error
I have two apps, app1.cs and app2.cs (codes below). In addition I also have a dll I extracted from refer.cs(code below). When I compile app1.cs(which sends a measurement object) I get the following exception:
Unhandled Exception: RabbitMQ.Client.Exceptions.OperationInterruptioedException
I can't see how the connection is interrupted. Do you see where the problem is caused at?
Regards, Demi
//refer.cs from which refer.dll is created
using System;
using System.IO;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
namespace refer
{
//start alternate serialization
public static class AltSerialization
{
public static byte[] AltSerialize(Measurement m)
{
using (var ms = new MemoryStream())
{
var bf = new BinaryFormatter();
bf.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
bf.Serialize(ms, m);
return ms.GetBuffer();
}
}
public static Measurement AltDeSerialize(byte[] seriM)
{
using (var stream = new MemoryStream( seriM ))
{
BinaryFormatter bf = new BinaryFormatter();
bf.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
return (Measurement)bf.Deserialize(stream);
}
}
}
//end alternte serialization
[Serializable] //This attribute sets class to be serialized
public class Measurement : ISerializable
{
[NonSerialized] public int id;
public int time; //timestamp
public double value;
public Measurement()
{
id = 1;
time = 12;
value = 0.01;
}
public Measurement(int _id, int _time, double _value)
{
id = _id;
time = _time;
value = _value;
}
//Deserialization constructor
public Measurement(SerializationInfo info, StreamingContext ctxt)
{
//Assign the values from info to the approporiate properties
Console.WriteLine("DeSerialization construtor called.");
time = (int)info.GetValue("MeasurementTime", typeof(int));
value = (double)info.GetValue("MeasurementValue", typeof(double));
}
//Serialization function
public void GetObjectData(SerializationInfo info, StreamingContext ctxt)
{
// Custom name-value pair
// Values must be read with the same name they're written
info.AddValue("MeasurementTime", time);
info.AddValue("MeasurementValue", value);
}
}
}
//MB1.cs
using System;
using System.IO;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using UtilityMeasurement;
public interface IMessageBus
{
string MsgSys // Property 1
{
get;
set;
}
void write (Measurement m1);
Measurement read();
void publish(string queue);
void subscribe(string queue);
}
public class Rabbit : IMessageBus
{
// Implementation of methods for Rabbit class go here
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();
public void write ( Measureme开发者_运维百科nt m1 )
{
byte[] body = Measurement.AltSerialize( m1 );
IConnection connection = factory.CreateConnection();
IModel channel = connection.CreateModel();
foreach (string queue in publishQ)
{
channel.BasicPublish("", queue, null, body);
Console.WriteLine("\n [x] Sent to queue {0}.", queue);
}
}
public void publish(string queueName)
{
channel.QueueDeclare(queueName, true, false, false, null); //durable=true
publishQ.Add(queueName); //and, add it the list of queue names to publish to
}
public Measurement read()
{
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
foreach (string queue in subscribeQ)
{
channel.BasicConsume(queue, true, consumer);
}
System.Console.WriteLine(" [*] Waiting for messages." +
"To exit press CTRL+C");
BasicDeliverEventArgs ea =
(BasicDeliverEventArgs)consumer.Queue.Dequeue();
return Measurement.AltDeSerialize(ea.Body);
}
public void subscribe(string queueName)
{
channel.QueueDeclare(queueName, true, false, false, null);
subscribeQ.Add(queueName);
}
public static string MsgSysName;
public string MsgSys
{
get
{
return MsgSysName;
}
set
{
MsgSysName = value;
}
}
public Rabbit(string _msgSys) //Constructor
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";
System.Console.WriteLine("\nMsgSys: RabbitMQ");
MsgSys = _msgSys;
}
}
public class Zmq : IMessageBus
{
public void write ( Measurement m1 )
{
//
}
public Measurement read()
{
//
return null;
}
public void publish(string queue)
{
//
}
public void subscribe(string queue)
{
//
}
public static string MsgSysName;
public string MsgSys
{
get
{
return MsgSysName;
}
set
{
MsgSysName = value;
}
}
// Implementation of methods for Zmq class go here
public Zmq(string _msgSys) //Constructor
{
System.Console.WriteLine("ZMQ");
MsgSys = _msgSys;
}
}
public class MessageBusFactory
{
public static IMessageBus GetMessageBus(string MsgSysName)
{
switch ( MsgSysName )
{
case "Zmq":
return new Zmq(MsgSysName);
case "Rabbit":
return new Rabbit(MsgSysName);
default:
throw new ArgumentException("Messaging type " +
MsgSysName + " not supported." );
}
}
}
public class MainClass
{
public static void Main()
{
//Asks for the message system
System.Console.WriteLine("\nEnter name of messageing system: ");
System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
string MsgSysName = (System.Console.ReadLine()).ToString();
//Create a new Measurement message
Measurement m1 = new Measurement(2, 2345, 23.456);
//Declare an IMessageBus instance:
//Here, an object of the corresponding Message System
// (ex. Rabbit, Zmq, etc) is instantiated
IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);
System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);
System.Console.WriteLine("With Test message:\n ID: {0}", m1.id);
System.Console.WriteLine(" Time: {0}", m1.time);
System.Console.WriteLine(" Value: {0}", m1.value);
// Ask queue name and store it
System.Console.WriteLine("Enter a queue name to publish the message to: ");
string QueueName = (System.Console.ReadLine()).ToString();
obj1.publish( QueueName );
System.Console.WriteLine("Enter another queue name: ");
QueueName = (System.Console.ReadLine()).ToString();
obj1.publish( QueueName );
// Write message to the queue
obj1.write( m1 );
}
}
//MB2.cs
using System;
using System.IO;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using UtilityMeasurement;
public interface IMessageBus
{
string MsgSys // Property 1
{
get;
set;
}
void write (Measurement m1);
Measurement read();
void publish(string queue);
void subscribe(string queue);
}
public class Rabbit : IMessageBus
{
// Implementation of methods for Rabbit class go here
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();
public void write ( Measurement m1 )
{
byte[] body = Measurement.AltSerialize( m1 );
IConnection connection = factory.CreateConnection();
IModel channel = connection.CreateModel();
foreach (string queue in publishQ)
{
channel.BasicPublish("", queue, null, body);
Console.WriteLine("\n [x] Sent to queue {0}.", queue);
}
}
public void publish(string queueName)
{
channel.QueueDeclare(queueName, true, false, false, null); //durable=true
publishQ.Add(queueName); //and, add it the list of queue names to publish to
}
public Measurement read()
{
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
foreach (string queue in subscribeQ)
{
channel.BasicConsume(queue, true, consumer);
}
System.Console.WriteLine(" [*] Waiting for messages." +
"To exit press CTRL+C");
BasicDeliverEventArgs ea =
(BasicDeliverEventArgs)consumer.Queue.Dequeue();
return Measurement.AltDeSerialize(ea.Body);
}
public void subscribe(string queueName)
{
channel.QueueDeclare(queueName, true, false, false, null);
subscribeQ.Add(queueName);
}
public static string MsgSysName;
public string MsgSys
{
get
{
return MsgSysName;
}
set
{
MsgSysName = value;
}
}
public Rabbit(string _msgSys) //Constructor
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";
System.Console.WriteLine("\nMsgSys: RabbitMQ");
MsgSys = _msgSys;
}
}
public class Zmq : IMessageBus
{
public void write ( Measurement m1 )
{
//
}
public Measurement read()
{
//
return null;
}
public void publish(string queue)
{
//
}
public void subscribe(string queue)
{
//
}
public static string MsgSysName;
public string MsgSys
{
get
{
return MsgSysName;
}
set
{
MsgSysName = value;
}
}
// Implementation of methods for Zmq class go here
public Zmq(string _msgSys) //Constructor
{
System.Console.WriteLine("ZMQ");
MsgSys = _msgSys;
}
}
public class MessageBusFactory
{
public static IMessageBus GetMessageBus(string MsgSysName)
{
switch ( MsgSysName )
{
case "Zmq":
return new Zmq(MsgSysName);
case "Rabbit":
return new Rabbit(MsgSysName);
default:
throw new ArgumentException("Messaging type " +
MsgSysName + " not supported." );
}
}
}
public class MainClass
{
public static void Main()
{
//Asks for the message system
System.Console.WriteLine("\nEnter name of messageing system: ");
System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
string MsgSysName = (System.Console.ReadLine()).ToString();
//Declare an IMessageBus instance:
//Here, an object of the corresponding Message System
// (ex. Rabbit, Zmq, etc) is instantiated
IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);
System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);
System.Console.WriteLine("Enter a queue to subscribe to: ");
string QueueName = (System.Console.ReadLine()).ToString();
obj1.subscribe( QueueName );
//Create a new Measurement object m2
Measurement m2 = new Measurement();
//Read message into m2
m2 = obj1.read();
m2.id = 11;
System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}",QueueName, m2.id);
System.Console.WriteLine(" Time: {0}", m2.time);
System.Console.WriteLine(" Value: {0}", m2.value);
}
}
I just created a vanilla C# VS2010 Console application project with the Refer.cs and App1.cs in the same project.
I made the following changes:
- Added RabbitMQ.Client.dll
- Removed the AssemblyVersion attributes
- Added string[] args to the Main method in App1.cs
Also, I changed:
factory.HostName = "localhost";
To this:
factory.HostName = "192.168.56.101";
Which is the ip address to my VirtualBox Ubuntu VM running rabbitmq-server. There was no exception thrown, and the message successfully was received on the server.
All signs point to server configuration with what is given. My guess is either your rabbitmq-server is not running at all, it's not running on localhost, or there is some kind of connectivity issue with port 5672.
精彩评论