Multithreaded queue destruction in C++
So I have a shared concurrent queue. It seems to be working nicely, except for destruction.
The way the queue is implemented is that it contains a condition variable and mutex pair. Several worker threads are started up that wait on this condition variable. When new objects are available to be worked on they are pushed into the queue and the condition variable signaled.
The problem is that when the main thread exits, destroying the queue, the condition variable is destroyed, but this fails as the condition variable is in use. This throws an exception and everything blows up nastily.
I'd like to signal the workers, wake them up and have them exit, and wait for them to be done, then continue on the main thread. My problem is working out when those threads are done - am I going to need an extra synchronisation primitive?
Anyway heres the code for the queue:
// Based on code from http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
// Original version by Anthony Williams
// Modifications by Michael Anderson
#include "boost/thread.hpp"
#include <deque>
template<typename Data>
class concurrent_queue
{
private:
std::deque<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
bool is_canceled;
public:
concurrent_queue() : the_queue(), the_mutex(), the_condition_variable(), is_canceled(false) {}
struct Canceled{};
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
if (is_canceled) throw Canceled();
the_queue.push_back(data);
lock.unlock();
the_condition_variable.notify_one();
}
bool empty() const
{
boost::mutex::scoped_lock lock(the_mutex);
if (is_canceled) throw Canceled();
return the_queue.empty();
}
bool try_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
if (is_canceled) throw Canceled();
if(the_queue.empty())
{
return false;
}
popped_value=the_queue.front();
the_queue.pop_front();
return true;
}
void wait_and_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty() && !is_canceled)
{
the_condition_variable.wait(lock);
}
if (is_canceled) throw Canceled();
popped_value=the_queue.front();
the_queue.pop_front();
}
std::deque<Data> wait_and_take_all()
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty() && !is_canceled)
{
the_condition_variable.wait(lock);
}
if (is_canceled) throw Canceled();
std::deque<Data> retval;
std::swap(retval, the_queue);
return retval;
}
void cancel()
{
boost::mutex::scoped_lock lock(the_mutex);
if (is_canceled) throw Canceled();
is_canceled = true;
lock.unlock()开发者_JAVA技巧;
the_condition_variable.notify_all();
}
};
You can call join()
on each thread to wait until it has finished executing. Something like this:
void DoWork() {};
int main()
{
boost::thread t(&DoWork);
// signal for the thread to exit
t.join(); // wait until it actually does exit
// destroy the queue
}
Or you can use boost::thread_group
for multiple threads.
int main()
{
boost::thread_group tg;
for(int i = 0 ; i < 10 ; ++i)
tg.create_thread(&DoWork);
// signal to stop work
tg.join_all();
// destroy the queue
}
You have two options. Either when the queue goes out of scope, it doesn't actually get destroyed while the other threads reference it (i.e. use shared_ptr, pass it to the other threads; call cancel() at the end of main(); once the other threads throw Cancelled and presumably exit, the queue will then get destroyed).
Or if you want to ensure that it's actually destroyed by the end of main(), then you'll need to wait for the other threads. You can do what JaredC suggests if you are OK with handling the wait outside of the destructor. To do it inside the destructor, it seems cleaner not to store all the threads, but just to keep a count and another synchronization primitive. Either way, you'll need the queue to maintain some state to allow it to wait for all the threads to finish.
To me, it seems that the first solution (with shared_ptr) is cleaner.
精彩评论