Efficient consumer thread with multiple producers
I am trying to make a producer/consumer thread situation more efficient by skipping expensive event operations if necessary with something like:
//cas(variable, compare, set) is atomic compare and swap
//queue is already lock free
running = false
// dd item to queue – producer thread(s)
if(cas(running, false, true))
{
// We effectively obtained a lock on signalling the event
add_to_queue()
signal_event()
}
else
{
// Most of the time if things are busy we should not be signalling the event
add_to_queue()
if(cas(running, false, true))
signal_event()
}
...
// Process queue, single consumer thread
reset_event()
while(1)
{
wait_fo开发者_运维技巧r_auto_reset_event() // Preferably IOCP
for(int i = 0; i < SpinCount; ++i)
process_queue()
cas(running, true, false)
if(queue_not_empty())
if(cas(running, false, true))
signal_event()
}
Obviously trying to get these things correct is a little tricky(!) so is the above pseudo code correct? A solution that signals the event more than is exactly needed is ok but not one that does so for every item.
This falls into the sub-category of "stop messing about and go back to work" known as "premature optimisation". :-)
If the "expensive" event operations are taking up a significant portion of time, your design is wrong, and rather than use a producer/consumer you should use a critical section/mutex and just do the work from the calling thread.
I suggest you profile your application if you are really concerned.
Updated:
Correct answer:
Producer
ProducerAddToQueue(pQueue,pItem){
EnterCriticalSection(pQueue->pCritSec)
if(IsQueueEmpty(pQueue)){
SignalEvent(pQueue->hEvent)
}
AddToQueue(pQueue, pItem)
LeaveCriticalSection(pQueue->pCritSec)
}
Consumer
nCheckQuitInterval = 100; // Every 100 ms consumer checks if it should quit.
ConsumerRun(pQueue)
{
while(!ShouldQuit())
{
Item* pCurrentItem = NULL;
EnterCriticalSection(pQueue-pCritSec);
if(IsQueueEmpty(pQueue))
{
ResetEvent(pQueue->hEvent)
}
else
{
pCurrentItem = RemoveFromQueue(pQueue);
}
LeaveCriticalSection(pQueue->pCritSec);
if(pCurrentItem){
ProcessItem(pCurrentItem);
pCurrentItem = NULL;
}
else
{
// Wait for items to be added.
WaitForSingleObject(pQueue->hEvent, nCheckQuitInterval);
}
}
}
Notes:
- The event is a manual-reset event.
- The operations protected by the critical section are quick. The event is only set or reset when the queue transitions to/from empty state. It has to be set/reset within the critical section to avoid a race condition.
- This means the critical section is only held for a short time. so contention will be rare.
- Critical sections don't block unless they are contended. So context switches will be rare.
Assumptions:
- This is a real problem not homework.
- Producers and consumers spend most of their time doing other stuff, i.e. getting the items ready for the queue, processing them after removing them from the queue.
- If they are spending most of the time doing the actual queue operations, you shouldn't be using a queue. I hope that is obvious.
Went thru a bunch of cases, can't see an issue. But it's kinda complicated. I thought maybe you would have an issue with queue_not_empty / add_to_queue racing. But looks like the post-dominating CAS in both paths covers this case.
CAS is expensive (not as expensive as signal). If you expect skipping the signal to be common, I would code the CAS as follows:
bool cas(variable, old_val, new_val) {
if (variable != old_val) return false
asm cmpxchg
}
Lock-free structures like this is the stuff that Jinx (the product I work on) is very good at testing. So you might want to use an eval license to test the lock-free queue and signal optimization logic.
Edit: maybe you can simplify this logic.
running = false
// add item to queue – producer thread(s)
add_to_queue()
if (cas(running, false, true)) {
signal_event()
}
// Process queue, single consumer thread
reset_event()
while(1)
{
wait_for_auto_reset_event() // Preferably IOCP
for(int i = 0; i < SpinCount; ++i)
process_queue()
cas(running, true, false) // this could just be a memory barriered store of false
if(queue_not_empty())
if(cas(running, false, true))
signal_event()
}
Now that the cas/signal are always next to each other they can be moved into a subroutine.
Why not just associate a bool
with the event? Use cas
to set it to true, and if the cas
succeeds then signal the event because the event must have been clear. The waiter can then just clear the flag before it waits
bool flag=false;
// producer
add_to_queue();
if(cas(flag,false,true))
{
signal_event();
}
// consumer
while(true)
{
while(queue_not_empty())
{
process_queue();
}
cas(flag,true,false); // clear the flag
if(queue_is_empty())
wait_for_auto_reset_event();
}
This way, you only wait if there are no elements on the queue, and you only signal the event once for each batch of items.
I believe, you want to achieve something like in this question:
WinForms Multithreading: Execute a GUI update only if the previous one has finished. It is specific on C# and Winforms, but the structure may well apply for you.
精彩评论