开发者

Efficient consumer thread with multiple producers

I am trying to make a producer/consumer thread situation more efficient by skipping expensive event operations if necessary with something like:

//cas(variable, compare, set) is atomic compare and swap
//queue is already lock free

running = false


// dd item to queue – producer thread(s)

if(cas(running, false, true))
{
  // We effectively obtained a lock on signalling the event
  add_to_queue()
  signal_event()
}
else
{
  // Most of the time if things are busy we should not be signalling the event
  add_to_queue()

  if(cas(running, false, true))
    signal_event()
}

...

// Process queue, single consumer thread

reset_event()

while(1)
{
  wait_fo开发者_运维技巧r_auto_reset_event() // Preferably IOCP

  for(int i = 0; i &lt SpinCount; ++i)
    process_queue()

  cas(running, true, false)

  if(queue_not_empty())
    if(cas(running, false, true))
      signal_event()
}

Obviously trying to get these things correct is a little tricky(!) so is the above pseudo code correct? A solution that signals the event more than is exactly needed is ok but not one that does so for every item.


This falls into the sub-category of "stop messing about and go back to work" known as "premature optimisation". :-)

If the "expensive" event operations are taking up a significant portion of time, your design is wrong, and rather than use a producer/consumer you should use a critical section/mutex and just do the work from the calling thread.

I suggest you profile your application if you are really concerned.

Updated:

Correct answer:

Producer

ProducerAddToQueue(pQueue,pItem){

    EnterCriticalSection(pQueue->pCritSec)
        if(IsQueueEmpty(pQueue)){
            SignalEvent(pQueue->hEvent)
        }

        AddToQueue(pQueue, pItem)
    LeaveCriticalSection(pQueue->pCritSec)
}

Consumer

nCheckQuitInterval = 100; // Every 100 ms consumer checks if it should quit.

ConsumerRun(pQueue)
{
    while(!ShouldQuit())
    {
        Item* pCurrentItem = NULL;
        EnterCriticalSection(pQueue-pCritSec);
            if(IsQueueEmpty(pQueue))
            {
                ResetEvent(pQueue->hEvent)
            }
            else
            {
                pCurrentItem = RemoveFromQueue(pQueue);
            }
        LeaveCriticalSection(pQueue->pCritSec);

        if(pCurrentItem){
            ProcessItem(pCurrentItem);
            pCurrentItem = NULL;
        }
        else
        {
            // Wait for items to be added.
            WaitForSingleObject(pQueue->hEvent, nCheckQuitInterval);
        }

    }
}

Notes:

  • The event is a manual-reset event.
  • The operations protected by the critical section are quick. The event is only set or reset when the queue transitions to/from empty state. It has to be set/reset within the critical section to avoid a race condition.
  • This means the critical section is only held for a short time. so contention will be rare.
  • Critical sections don't block unless they are contended. So context switches will be rare.

Assumptions:

  • This is a real problem not homework.
  • Producers and consumers spend most of their time doing other stuff, i.e. getting the items ready for the queue, processing them after removing them from the queue.
  • If they are spending most of the time doing the actual queue operations, you shouldn't be using a queue. I hope that is obvious.


Went thru a bunch of cases, can't see an issue. But it's kinda complicated. I thought maybe you would have an issue with queue_not_empty / add_to_queue racing. But looks like the post-dominating CAS in both paths covers this case.

CAS is expensive (not as expensive as signal). If you expect skipping the signal to be common, I would code the CAS as follows:

bool cas(variable, old_val, new_val) {
   if (variable != old_val) return false
   asm cmpxchg
}

Lock-free structures like this is the stuff that Jinx (the product I work on) is very good at testing. So you might want to use an eval license to test the lock-free queue and signal optimization logic.


Edit: maybe you can simplify this logic.

running = false 

// add item to queue – producer thread(s) 
add_to_queue()
if (cas(running, false, true)) {
   signal_event()
}

// Process queue, single consumer thread 

reset_event() 

while(1) 
{ 
    wait_for_auto_reset_event() // Preferably IOCP 

   for(int i = 0; i &lt SpinCount; ++i) 
       process_queue() 

   cas(running, true, false)  // this could just be a memory barriered store of false

   if(queue_not_empty()) 
      if(cas(running, false, true)) 
         signal_event() 
} 

Now that the cas/signal are always next to each other they can be moved into a subroutine.


Why not just associate a bool with the event? Use cas to set it to true, and if the cas succeeds then signal the event because the event must have been clear. The waiter can then just clear the flag before it waits

bool flag=false;

// producer
add_to_queue();
if(cas(flag,false,true))
{
    signal_event();
}

// consumer
while(true)
{
    while(queue_not_empty())
    {
        process_queue();
    }
    cas(flag,true,false); // clear the flag
    if(queue_is_empty())
        wait_for_auto_reset_event();
}

This way, you only wait if there are no elements on the queue, and you only signal the event once for each batch of items.


I believe, you want to achieve something like in this question:
WinForms Multithreading: Execute a GUI update only if the previous one has finished. It is specific on C# and Winforms, but the structure may well apply for you.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜