开发者

Windows Services communicating via MSMQs - Do I need a service bus?

I have this problem where a system contains nodes (windows services) that push messages to be 开发者_如何学运维processed and others that pull messages and process them.

This has been designed in a way that the push nodes balance the load between queues by maintaining a round-robin list of queues and rotating queues after each send. Therefore message 1 will go to queue 1, message 2 to queue 2 etc. This part has been working great so far.

On the message pull end we designed it such that the messages are retrieved in a similar way - first from queue 1, then from queue 2 etc. In theory, each pull node sits on a different machine and in practice, so far, it only listened on a single queue. But a recent requirement made us have a pull node in a machine that listens to more than one queue: One that typically is extremely busy and filled with millions of messages and one that generally only contains a handful of messages.

The problem we are facing is that the way we architected originally the pull nodes goes from queue to queue until a message is found. If it times out (say after a sec) then it moves on to the next queue.

This doesnt work anymore cause Q1 (filled with millions of messages) will be delayed approximately a second per message since after each pull from Q1 we will ask Q2 for a message (and if it doesnt contain any we will wait for a second).

So it goes like this:

Q1 contains 10 messages and Q2 contains none

  • Pull node asks for a message from Q1
  • Q1 returns message immediately
  • Pull node asks for a message from Q1
  • ------------ Waiting for a second ------------- (Q2 is empty and request times out)
  • Pull node asks for a message from Q1
  • Q1 returns message immediately
  • Pull node asks for a message from Q1
  • ------------ Waiting for a second ------------- (Q2 is empty and request times out)

etc.

So this is clearly wrong.

I guess I am looking for the best architectural solution here. Message processing does not need to be as real-time as possible but needs to be robust and no message should ever be lost!

I would like to hear your views on this problem.

Thank in advance Yannis


Maybe you could use the ReceiveCompleted event in the MessageQueue class? No need to poll then.


I ended up creating a set of threads - one for each msmq that needs to be processed. In the constructor I initialize those threads:

Storages.ForEach(queue =>
        {
            Task task = Task.Factory.StartNew(() =>
            {
                LoggingManager.LogInfo("Starting a local thread to read in mime messages from queue " + queue.Name, this.GetType());
                while (true)
                {
                    WorkItem mime = queue.WaitAndRetrieve();
                    if (mime != null)
                    {
                        _Semaphore.WaitOne();
                        _LocalStorage.Enqueue(mime);

                        lock (_locker) Monitor.Pulse(_locker);

                        LoggingManager.LogDebug("Adding no. " + _LocalStorage.Count + " item in queue", this.GetType());
                    }
                }
            });
        });
  • The _LocalStorage is a thread-safe Queue implementation (ConcurrentQueue introduced in .NET 4.0)

  • The Semaphore is a counting semaphore to control inserts in the _LocalStorage. The _LocalStorage is basically a buffer of received messages but we dont want it to get too large while processing nodes are busy doing work. The effect could be that we retrieve ALL the msmq messages in that _LocalStorage but are busy processing only 5 of them or so. This is bad both in terms of resilience (if the program terminates unexpectedly we lose all these messages) and also in terms of performance as the memory consumption for holding all these items in memory will be huge. So we need to control how many items we hold in the _LocalStorage buffer queue.

  • We Pulse threads waiting for work (see below) that a new item was added to the queue by doing a simple Monitor.Pulse

The code that dequeues work items from the queue is as follows:

lock (_locker)
            if (_LocalStorage.Count == 0) 
                Monitor.Wait(_locker);

        WorkItem result;
        if (_LocalStorage.TryDequeue(out result))
        {
            _Semaphore.Release();
            return result;
        }

        return null;

I hope this can help someone to sort out a similar issue.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜