BlockingCollection - high synchronization problem
What is the best way of getting messages from many threads onto a queue and have a separate thread processing items of this queue one at a time?
I am frequently using this pattern when trying to disconnect activities from many threads.
I am using a BlockingCollection for this as shown in a code extract below:
// start this task in a static constructor
Task.Factory.StartNew(() => ProcessMultiUseQueueEntries(), TaskCreationOptions.LongRunning);
private static BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>> _q = new BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>>();
/// <summary>
/// queued - Simple mechanism that will log the fact that this user is sending an xMsg (FROM a user)
/// </summary>
public static void LogXMsgFromUser(XClientMsgExt xMsg)
{
_q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(xMsg, null, "", BOStatus.Ignore));
}
/// <summary>
/// queued - Simple mechanism that will log the data being executed by this user
/// </summary>
public static void LogBOToUser(BOInfo boInfo)
{
_q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(null, boInfo, "", BOStatus.Ignore));
}
/// <summary>
/// queued - Simple mechanism that will log the status of the BO being executed by this user (causes the red square to flash)
/// </summary>
public static void LogBOStatus(string UserID, BOStatus status)
{
_q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(null, null, UserID, status));
}
/// <summary>
/// An endless thread that will keep checking the Queue for new entrants.
/// NOTE - no error handling since this can't fail... :) lol etc
/// </summary>
private static void ProcessMultiUseQueueEntries()
{
while (true) // eternal loop
{
Tuple<XClientMsgExt, BOInfo, string, BOStatus> tuple = _q.Take();
开发者_StackOverflow中文版 // Do stuff
}
}
This works fine - so I thought - until the Performance Wizard in VS2010 started to highlight the _q.Take() row as the highest contention line in my code!
Note I have also used a standard ConcurrentQueue with a ManualResetEvent combination and each time I insert an item onto the queue I signal the resetevent allowing the worker thread to examine and process the Queue but this also had the same net effect of being highlighted on the .WaitOne() method...
Are there other ways of solving this common pattern of having many threads adding objects into a concurrent queue - and have a single thread ploughing its way through the items one at a time and in its own time...
Thanks!!
Highest contention line? Yes, because it is a blocking collection! That call will block (e.g., it might be waiting on a WaitHandle
) until another element is added to the collection.
Are you sure this is a problem? It sounds like exactly what I'd expect.
In case it isn't clear what I mean, consider this code:
var blocker = new BlockingCollection<int>();
int nextItem = blocker.Take();
How long would you expect that Take
call above to run? I'd expect it to wait forever because nothing's being added to blocker
. Thus if I profiled the "performance" of the above code, I'd see Take
right up at the top of the list of long-running methods. But this would be not be indicative of a problem; again: you want that call to block.
On a completely separate note, might I recommend replacing Tuple<XClientMsgExt, BOInfo, string, BOStatus>
with a type whose properties have descriptive names? (This suggestion has nothing to do with your question, of course; it's just a piece of general advice.)
That _q.Take()
is the highest contention line is meaningless in itself. There's going to be contention if many threads are waiting for items. The big question is whether that contention is costing you in terms of performance. A few questions you need to find the answers to:
- Are you able to process items fast enough to prevent the queue from growing without bound?
- Is the contention costing you in terms of CPU usage?
- If you stop adding things to the collection, does the Performance Wizard still report contention?
- If you stop adding things to the collection, is there high CPU usage?
If you're able to keep the queue from growing and CPU resources aren't being spent on taking items, then there's no problem. Except perhaps you have more threads than necessary reading from the collection.
Have you considered using an actual MSMQ? May seem like overkill, but it does provide what you need. No saying your application can't be both writer and have a dedicated thread to read and process the messages.
精彩评论