Blocking Queue implementation: Where's the race condition?
It's me and my BlockingQueue again... I rewrote it according to this article and this question. It sends some items and then crashes with an access violation. Here's the code:
template <typename T>
bool DRA::CommonCpp::CTBlockingQueue<T>::Push( T pNewValue ){
volatile long oldSize;
::InterlockedExchange( &oldSize, m_Size );
CTNode* pNewNode = new CTNode();
pNewNode->m_pValue = pNewValue;
{//RAII block
CGuard g( m_TailCriticalSection );
m_pTailNode->m_pNext = pNewNode;
m_pTailNode = pNewNode;
::InterlockedIncrement( &m_Size );
}
if( oldSize == 0 )
m_eAtLeastOneElement.set();
return true;
}
template <typename T>
bool DRA::CommonCpp::CTBlockingQueue<T>::Pop( T& pValue ){
CTNode* pCurrentNode;
{//RAII block
CGuard g( m_HeadCriticalSection );
pCurrentNode = m_pHeadNode;
CTNode* pNewHeadNode = m_pHeadNode->m_pNext;
if( pNewHeadNode == NULL ){
CEvent* pSignaledEvent;
CEvent::waitForPair( m_eAtLeastOneElement, m_eFinished, pSignaledEvent );
if( pSignaledEvent == &m_eFinished )
return false;
pNewHeadNode = m_pHeadNode->m_pNext;
}
pValue = pNewHeadNode->m_pValue;
m_pHeadNode = pNewHeadNode;
::InterlockedDecrement( &m_Size );
}
delete pCurrentNode;
return true;
}
It always crashes in 开发者_JAVA百科a call to Pop(), in the line after the if, the one which says:
pValue = pNewHeadNode->m_pValue
It blows up cos' pNewHeadNode is NULL. But how can this happen?
Edit: Forgot initialization code:
template <typename T>
DRA::CommonCpp::CTBlockingQueue<T>::CTBlockingQueue():
m_HeadCriticalSection("CTBlockingQueue<T>::m_Head"),
m_TailCriticalSection("CTBlockingQueue<T>::m_Tail"){
CTNode* pDummyNode = new CTNode();
m_pHeadNode = pDummyNode;
m_pTailNode = pDummyNode;
m_Size = 0; //Dummy node doesn't count
}
My assumption would have to be the fact that the event is set outside of the critical section, meaning push could potentially notify the event twice. Have you tried with setting the event inside the critical section?
In the end, I went back to my original, less efficient implementation, the one I posted here, with the addition of a Finish() method so that the producer can signal the consumers to end elegantly, and a Restart() method to start producing again without destroying and recreating the queue:
//Template definitions
template<class Element>
DRA::CommonCpp::CTBlockingQueue<Element>::CTBlockingQueue( unsigned int maxSize ):
m_csFinished( "CTBlockingQueue::m_csFinished" ),
m_csQueue( "CTBlockingQueue::m_csQueue" ),
m_semElementCount( 0, maxSize ),
m_bFinished(false){
}
template<class Element>
DRA::CommonCpp::CTBlockingQueue<Element>::~CTBlockingQueue(){
Finish();
}
template<class Element>
void DRA::CommonCpp::CTBlockingQueue<Element>::Push( Element newElement ){
{//RAII block
CGuard g( m_csQueue );
m_Queue.push( newElement );
}
m_semElementCount.Signal();
}
template<class Element>
bool DRA::CommonCpp::CTBlockingQueue<Element>::Pop( Element& element ){
m_semElementCount.Wait();
{//RAII block
CGuard g( m_csFinished );
if( m_bFinished ){
CGuard g( m_csQueue );
if ( m_Queue.size() == 0 )
return false;
}
}
{//RAII block
CGuard g( m_csQueue );
element = m_Queue.front();
m_Queue.pop();
}
return true;
}
template<class Element>
void DRA::CommonCpp::CTBlockingQueue<Element>::Finish(){
{//RAII block
CGuard g( m_csFinished );
m_bFinished = true;
}
{//RAII block
CGuard g( m_csQueue );
m_semElementCount.Signal();
}
}
template<class Element>
void DRA::CommonCpp::CTBlockingQueue<Element>::Restart(){
{//RAII block
CGuard g( m_csFinished );
m_bFinished = false;
}
}
This ain't the fastest way to go, but works fast enough for me.
精彩评论