How can I manually handle any subscribed to message type in NServiceBus?
I'm trying to build a layer over NServiceBus to make it simpler for other developers to use.
I'm trying to do without the config file and managed to get the publisher to work:
public class NServiceBusPublisher
{
private IBus _Bus { get; set; }
public void NServiceBusPublisher(string argInputQueue, string argErrorQueue)
{
Configure configure = NServiceBus.Configure.With().DefaultBuilder();
var transport = configure.Configurer.ConfigureComponent<MsmqTransport>(ComponentCallModelEnum.Singleton);
transport.ConfigureProperty(t => t.InputQueue, argInputQueue);
transport.ConfigureProperty(t => t.ErrorQueue, argErrorQueue);
transport.ConfigureProperty(t => t.NumberOfWorkerThreads, 1);
transport.ConfigureProperty(t => t.MaxRetries, 5);
_Bus =
configure
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.PurgeOnStartup(false)
.MsmqSubscriptionStorage()
.UnicastBus()
.ImpersonateSender(false)
.CreateBus()
.Start();
}
public void Publish(NServiceBus.IMessage argMessage)
{
_Bus.Publish(argMessage);
}
}
I also want to have an NServiceBus Subscriber and make it possible for developers to subscribe to any number of message types as long as the message inherits from NServiceBus.IMessage:
public class NServiceBusSubscriber
{
private IBus _Bus { get; set; }
public void NServiceBusSubscriber(string argInputQueue, string argOutputQueue, string argErrorQueue, string messagesAssembly)
{
Configure configure = NServiceBus.Configure.With().DefaultBuilder();
var transport = configure.Configurer.ConfigureComponent<MsmqTransport>(ComponentCallModelEnum.Singleton);
transport.ConfigureProperty(t => t.InputQueue, argInputQueue);
transport.ConfigureProperty(t => t.ErrorQueue, argErrorQueue);
transport.ConfigureProperty(t => t.NumberOfWorkerThreads, 1);
transport.ConfigureProperty(t => t.MaxRetries, 5);
var ucb = configure.Configurer.ConfigureComponent<NServiceBus.Unicast.UnicastBus>(ComponentCallMod开发者_如何学CelEnum.Singleton);
ucb.ConfigureProperty(u => u.MessageOwners, new Dictionary<string,string>()
{
{messagesAssembly, argOutputQueue}
});
_Bus =
configure
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.PurgeOnStartup(false)
.MsmqSubscriptionStorage()
.UnicastBus()
.ImpersonateSender(false)
.DoNotAutoSubscribe()
.CreateBus()
.Start();
}
public void Subscribe<T>() where T : NServiceBus.IMessage
{
_Bus.Subscribe<T>();
}
}
The problem is that I couldn't find any way to attach an event handler to a particular message type.
Could you please help me figure this out?
Its been a while since the question has been asked, so I am not sure if the problem has been solved, but here's one way you can do it using Bus.Subscribe (although as has been said by other respondents this is not the prescribed way of doing it NServiceBus)
Subscribe the to the message type using the subscribe overload
void Subscribe(Type messageType, Predicate<IMessage> condition);
Then you can handle the message in the delegate
private bool Handle(NServiceBus.IMessage nsbMsg)
{
//you get the message instance that you can handle
//return true
}
So, your code would then be
class MySubscriber
{
public IBus Bus {get; set;}
public void Subscribe()
{
Bus.Subscribe(typeof(MyMessage), Handle);
}
public void Handle(NServiceBus.IMessage nsbMsg)
{
var msg = nsbMsg as MyMessage;
//your code
return true;
}
}
However please note that by doing this you have to manage the lifetime of the handler yourself, which otherwise would have been managed for you by NServiceBus using the IOC framework of your choice.
You will also have to pass the reference to IBus explicitly which would be injected for you automatically if you were just implementing the IHandleMessage interface.
An architectural point here is that NSB is a full fledged 'ESB', its not just a messaging layer. Adding another layer over your ESB is IMHO an abstraction too many.
I think you are missing the concept behind NServiceBus.
Based on the code you show I get the impression that you envision services that publish messages and others that process those messages. In my experience most processes do both: they subscribe to events or process incoming commands and in result publish new events and send new commands. In your setup you would need to have publisher and subscriber instances for each of these message types.
NServiceBus is built for the situation I describe. You configure and start 1 bus instance and that orchestrates the complete application.
If you want to make it easier for developers to use NServiceBus I would concentrate on the configuration part only. In our company I have created a ServicebusConfigurator class that configures NServiceBus according our company standards and extracted that in a framework and a simple extension method for the .NET Core generic host. The only code our developers need to write to create a Windows Service that hosts an NServiceBus endpoint is something like this:
internal static class Program
{
private static int Main(string[] args)
{
return (int)Host.CreateDefaultBuilder(args) //.NET Core generic host
.WithNServiceBus() //configure NServiceBus according to our standards and start it.
.UseTopshelf<Worker>() // use Worker as the actual service doing the work.
.EnableNsbInstallersDuringInstall() // Execute any NServiceBus transport specific installation code during install of the service.
.Run(); // Run the thing.
}
}
Since you are not auto-subscribing the first thing you will need to do is subscribe to the message type via Bus.Subscribe(). Others could do this at the IWantToRunAtStartUp extension point(implement the interface in a class somewhere). From there, each subscriber will implement the IHandleMessages<T> interface. Implementing this interface wires you to a message where "T" is the message type.
When NSB starts up it will scan the local bin dir and find all your interface implementations and wire them up on your behalf internally. From there it will dispatch to the correct handler when a message of that type arrives.
NServiceBus automatically handles the subscription of messages. When you invoke Configure.With()....Start(); NServiceBus will scan to determine which assemblies implement IHandleMessages(SomeMessage) and it will send a subscription request to the publisher.
When you add "DoNotAutoSubscribe", you've got to manually get all messages being handled and do a Bus.Subscribe() for each of them.
Beyond that, NServiceBus will automatically handle the routing of an incoming message to the appropriate handler. In your subscriber code above, are you receiving an error message or are the messages disappearing from the queue?
精彩评论