开发者

Thread synchronization with boost::condition_variable

I'm doing some experiments on C++ multithreading and I have no idea how to solve one problem. Let's say we have thread pool, that process user requests using existing thread and creates new thread, when no free thread available. I've created command_queue thread-safe class, which have push and pop methods. pop waits while queue is empty and returns only when command is available or timeout occurred. Now it's time to implement thread pool. The idea is to make free threads sleep for some amount of time and kill the thread if there is nothing to do after that period of time. Here is implementation

command_queue::handler_t handler;
while (handler = tasks.pop(timeout))
{
    handler();
}

here we exit the thread procedure if timeout occurred. That is fine, but there is problem with new thread creation. Let's say we already have 2 thread processing user requests, they are working at the moment, but we need to do some other operation asynchronously. We call

thread_pool::start(some_operation);

which should start new thread, because there is no free threads available. When thread is available it calls timed_wait on condition variable, so the idea is to check whether there are threads that are waiting.

if (thread_are_free_threads) // ???
   condition.notify_one();
else
   create_thread(thread_proc);

but how to check it? Documentation says, that if there are no waiting threads notify_one does nothing. If I could check whether or not it did nothing that开发者_如何学C would be a solution

if (!condition.notify_one()) // nobody was notified
   create_thread(thread_proc);

As far as I see there is no way to check that.

Thanks for your answers.


You need to create another variable (perhaps a semaphore) which knows how many threads are running, then you can check that and create a new thread, if needed, before calling notify.

The other, better option is to just not have your threads exit when they time out. They should stay alive waiting to be notified. Instead of exiting when the notify times out, check a variable to see if the program is still running or if it is "shutting down", If it's still running, then start waiting again.


A more typical thread pool would look like this:

Pool::Pool()
{
    runningThreads = 0;
    actualThreads  = 0;
    finished       = false;
    jobQue.Init();

    mutex.Init();
    conditionVariable.Init();

    for(int loop=0; loop < threadCount; ++loop) { startThread(threadroutine); }
}

Pool::threadroutine()
{

    {
        // Extra code to count threads sp we can add more if required.
        RAIILocker doLock(mutex);
        ++ actualThreads;
        ++ runningThreads;
    }
    while(!finished)
    {
         Job job;
         {
             RAIILocker doLock(mutex);

             while(jobQue.empty())
             {
                 // This is the key.
                 // Here the thread is suspended (using zero resources)
                 // until some other thread calls the notify_one on the
                 // conditionVariable. At this point exactly one thread is release
                 // and it will start executing as soon as it re-acquires the lock
                 // on the mutex.
                 //
                 -- runningThreads;
                 conditionVariable.wait(mutex);
                 ++ runningThreads;
             }
             job = jobQue.getJobAndRemoveFromQue();
         }
         job.execute();
    }
    {
        // Extra code to count threads sp we can add more if required.
        RAIILocker doLock(mutex);
        -- actualThreads;
        -- runningThreads;
    }
}

Pool::AddJob(Job job)
{
    RAIILocker doLock(mutex);

    // This is where you would check to see if you need more threads.
    if (runningThreads == actualThreads) // Plus some other conditions.
    {
        // increment both counts. When it waits we decrease the running count.
        startThread(threadroutine);
    }
    jobQue.push_back(job);
    conditionVariable.notify_one();  // This releases one worker thread
                                     // from the call to wait() above.
                                     // Note: The worker thread will not start
                                     //       until this thread releases the mutex.
}


I think you need to rethink your design. In a simple model of a dealer thread handing out work the player threads, the dealer places the job onto the message queue and lets one of the players pick up the job when it gets a chance.

In your case the dealer is actively managing the thread pool in that it retains a knowledge on which player threads are idle and which are busy. Since the dealer knows which player is idle, the dealer can actively pass the idle the job and signal the player using a simple semaphore (or cond var) - there being one semaphore per player. In such a case, it might make sense for the dealer to destroy idle threads actively by giving the thread a kill myself job to do.


Now I found one solution, but it's not as that perfect. I have volatile member variable named free - that stores number of free threads in the pool.

void thread_pool::thread_function()
{
    free++;
    command_queue::handler_t handler;
    while (handler = tasks.pop(timeout))
    {
        free--;
        handler();
        free++;
    }
    free--;
}

when I assign task to the thread I do something like this

if (free == 0)
    threads.create_thread(boost::bind(&thread_pool::thread_function, this));

there is still issue with synchronization, because if the context will be switched after free-- in thread_function we might create a new thread, which we actually don't need, but as the tasks queue is thread safe there is no problem with that, it's just an unwanted overhead. Can you suggest solution to that and what do you think about it? Maybe it's better to leave it as it is then having one more synchronization here?


Another idea. You can query the length of the Message Queue. If it gets too long, create a new worker.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜