NServiceBus Host that Subscribes to his own Published Messages
Used Version of NServiceBus: 2.0.0.1145
Question:
Is it possible to configure a NServiceBus Host in such a way that, it consumes (subcribes to) his own published messages?
Answer:
It seems possible, but in the following Configuration it gives me a Transaction deadlocked Exception while trying to insert Subscriptions into the SubscriptionStorage. It happens when you use DbSubscriptionStorage and more than 1 "NumberOfWorkerThreads".
Error:
Could not execute command:
INSERT INTO Subscription (SubscriberEndpoint, MessageType) VALUES (@p0, @p1)
System.Data.SqlClinet.SqlException:
Transaction was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
After that NServiceBus tries to disconnect but fails because there is a transaction still in progress and throws an UnhandledException.
How to reproduce:
Here is my App.Config:
<!-- Publishing Configuration -->
<MsmqTransportConfig InputQueue="test_publisher_output" ErrorQueue="test_error" NumberOfWorkerThreads="3" MaxRetries="5" />
<!-- Subscription Configuration -->
<UnicastBusConfig DistributorControlAddress="" DistributorDataAddress="" ForwardReceivedMessagesTo="">
<MessageEndpointMappings>
<add Messages="MessageAssembly" Endpoint="test_publisher_output" />
</MessageEndpointMappings>
</UnicastBusConfig>
My Bus-Configuration:
var bus = Configure.With()
.Log4Net()
.开发者_高级运维StructureMapBuilder(container)
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.PurgeOnStartup(false)
.DBSubcriptionStorage(subscriptionDbProperties, true)
.Sagas()
.NHibernateSagaPersister(sagaDbProperties, true)
.UnicastBus()
.ImpersonateSender(false)
.LoadMessageHandlers(First<GridInterceptingMessageHandler>
.Then<SagaMessageHandler>())
.CreateBus()
.Start();
and here are my dbProperties for both the subscription and the saga db:
connection.provider NHibernate.Connection.DriverConnectionProvider
connection.driver_class NHibernate.Driver.SqlClientDriver
dialect NHibernate.Dialect.MsSql2005Dialect
Everything works fine as long as i don't increase the NumberOfWorkerThreads above 1. Everything higher than that and it will throw the above stated errors.
I hope i haven't forgotten anything. Thanks for your help in advance.
If you want the same process to handle a published message it would be better to do a Bus.SendLocal() after Bus.Publish(). The SendLocal() method will place a message on the local queue and your internal handler will pick it up and process it. This will get rid of your deadlock yet keep the same semantics.
I would really consider a re-design of this component. If you want the stability nservicebus gives you, and you've already broken the component down so each part of the processing is in a separate message handler, put each message handler in a separate executable with a separate queue. If that isn't possible, well then you haven't really got the stability of nservicebus as you are locked down by some other pieces of code in which case you should just directly call the required functions.
If you are just testing them all running off one queue, then just split them up when you test as well. There really is no reason to subscribe to your own messages - either split the handlers into seperate endpoints if possible, and if not possible then call the functions directly.
精彩评论