开发者

boost asio need to post n jobs only after m jobs have finished

I'm looking for a way to wait for a number of jobs to finish, and then execute another completely different number of jobs. With threads, of course. A brief explanation: I created two worker threads, both executing run on io_service. The code below is taken from here.

For the sake of simplicity, i had created two types of jobs, CalculateFib i CalculateFib2. I want the CalculateFib2 jobs to start after and only after the CalculateFib jobs finish. I tried to use condition variable as explained here, but the program hangs if CalculateFib2 jobs are more than one. What am I doing wrong?

thx, dodol

#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;
boost::mutex mx;
boost::condition_variable cv;

void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service)
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Start" << std::endl;
    global_stream_lock.unlock();

    io_service->run();

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Finish" << std::endl;
    global_stream_lock.unlock();
}

size_t fib( size_t n )
{
    if ( n <= 1 )
    {
        return n;
    }
    boost::this_thread::sleep( 
        boost::posix_time::milliseconds( 1000 )
        );
    return fib( n - 1 ) + fib( n - 2);
}

void CalculateFib( size_t n )
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Now calculating fib( " << n << " ) " << std::endl;
    global_stream_lock.unlock();

    size_t f = fib( n );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] fib( " << n << " ) = " << f << std::endl;
    global_stream_lock.unlock();

    boost::lock_guard<boost::mutex> lk(mx);
    cv.notify_all();
}

void CalculateFib2( size_t n )
{
    boost::unique_lock<boost::mutex> lk(mx);
    cv.wait(lk);

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Now calculating fib2( " << n << " ) " << std开发者_如何学C::endl;
    global_stream_lock.unlock();

    size_t f = fib( n );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] fib2( " << n << " ) = " << f << std::endl;
    global_stream_lock.unlock();
}
int main( int argc, char * argv[] )
{
    boost::shared_ptr< boost::asio::io_service > io_service(
        new boost::asio::io_service
        );
    boost::shared_ptr< boost::asio::io_service::work > work(
        new boost::asio::io_service::work( *io_service )
        );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] The program will exit when all work has finished."
        << std::endl;
    global_stream_lock.unlock();

    boost::thread_group worker_threads;
    for( int x = 0; x < 2; ++x )
    {
        worker_threads.create_thread( 
            boost::bind( &WorkerThread, io_service )
            );
    }
    io_service->post( boost::bind( CalculateFib, 5 ) );
    io_service->post( boost::bind( CalculateFib, 4 ) );
    io_service->post( boost::bind( CalculateFib, 3 ) );

    io_service->post( boost::bind( CalculateFib2, 1 ) );
    io_service->post( boost::bind( CalculateFib2, 1 ) );
    work.reset();
    worker_threads.join_all();

    return 0;
}


Inside CalculateFib2 the first thing you do is wait for the condition (cv). This condition only gets signaled at the end of CalculateFib. So, it stands to reason that execution never continues, unless the condition is triggered (by posting CalculateFib) job.

Indeed, adding any other line like so:

io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );

io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );

io_service->post( boost::bind( CalculateFib, 5 ) );   // <-- ADDED

makes execution run to completion.

In an effort to shed more light: if you isolate a Fib2 batch (in time) like

io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );

boost::this_thread::sleep(boost::posix_time::seconds( 10 ));
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );

all the Fib2 jobs will always block, regardless of the number of threads, because the Fib jobs had all exited before posting them. A simple

io_service->post( boost::bind( CalculateFib, 1 ) );

will unlock all the waiters (i.e. only as many as there are waiting threads, which is the number of available threads -1, because the Fib() jobs occupies a thread as well. Now with <7 threads this would deadlock, because there is no thread available to even start the Fib() job on (all threads are blocked waiting in Fib2)


To be honest I don't get what you are trying to achieve in terms of scheduling. I suspect you should be monitoring job queues and explicitely posting jobs ('tasks') only when you reached the required amount of items. That way you can KISS and get a very flexible interface to your job scheduling.

In general, with a thread group (pooling) you want to avoid blocking the threads for indefinite amounts of time. This has the potential to deadlock your work scheduling as well as perform poorly otherwise.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜