ZeroMQ is eating my data....but only sometimes
So, I'm exploring 0MQ as a message passing protocol for my cluster application. It offers th开发者_JAVA百科e sort of asynchronous communication that I need.
So far, I've had some initial success crafting a prototype. I am able to send and receive messages using a few different type of patterns.
I am currently trying to get a PUB/SUB
pattern working with an initial synchronization via a REP/REQ
"handshake". This method is described in the ZeroMQ guide under "Node Coordination." And the method works pretty well. The only problem is that 0MQ appears to be eating ~50% of my messages, or at least the data therein.
I am working with Qt, so the messages consist of a QString
. I am byte-packing the string into a QByteArray
using a QDataStream
. Then, I pass the QByteArray
over the "wire" and unpack it on the other side. I've used this method frequently for communicating over other protocols (like tcp sockets), and it works well.
I have 4 "workers" connecting to via PUB/SUB
and REQ/REP
to a single "manager". On most runs at least 1 and at most 3 "workers" synchronize fine. However, when they don't they are receiving an empty string.
Here's the logs for the 4 clients:
alicia
:
[09:18:15.337] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client
alicia
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message:
START
[ 09:18:15.341] [Debug] Recieved a SYNC message
SYNC
[09:18:15.341] [Info] Received START signal from manager
[09:18:15.341] [Info] Received message:
2
brenda
:
[09:18:15.337] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client
brenda
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message:
START
[ 09:18:15.340] [Debug] Recieved a SYNC message
SYNC
[09:18:15.340] [Info] Received START signal from manager
[09:18:15.340] [Info] Received message:
START
[09:18:15.340] [Debug] Sending
0
th message
carlie
:
[09:18:15.336] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client
carlie
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message:
START
[ 09:18:15.340] [Debug] Recieved a SYNC message
[09:18:15.340] [Fatal] carlie
Caught unhandled WASError:
Assertion "SOMEPATH/zmqworker.cpp" failed at SOMEPATH/zmqworker.cpp:52:virtual void was::ZMQWorker::run(): Invalid sync message
darcie
:
[09:18:15.336] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client
darcie
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message:
START
[ 09:18:15.341] [Debug] Recieved a SYNC message
[09:18:15.341] [Fatal] darcie
Caught unhandled WASError:
Assertion "SOMEPATH/zmqworker.cpp" failed at SOMEPATH/zmqworker.cpp:52:virtual void was::ZMQWorker::run(): Invalid sync message
Here's most of the code:
from zmqtools.cpp
void sendQString( zmq::socket_t& socket, QString& str )
{
QByteArray package;
QDataStream packer( &package, QIODevice::WriteOnly );
packer << str;
zmq::message_t msg( package.data(), package.size(), NULL );
WAS_ASSERT_MSG( socket.send( msg ), "Failed to send QString" );
}
void recvQString( zmq::socket_t& socket, QString& str )
{
zmq::message_t msg;
WAS_ASSERT_MSG( socket.recv( &msg ), "Failed to receive QString" );
QByteArray package( ( char* )msg.data(), msg.size() );
QDataStream unpacker( &package, QIODevice::ReadOnly );
unpacker >> str;
}
From zmqworker.cpp
ZMQWorker::ZMQWorker( const QString& clientName, QObject *parent ) :
QThread( parent ),
clientName( clientName ),
context( 1 ),
inMessageSocket( context, ZMQ_PULL ),
outMessageSocket( context, ZMQ_PUSH ),
inControlSocket( context, ZMQ_SUB ),
synchronizeSocket( context, ZMQ_REQ )
{
qxtLog->debug() << "Initializing client " << clientName;
inMessageSocket.connect( "tcp://localhost:9900" );
outMessageSocket.connect( "tcp://localhost:9901" );
inControlSocket.connect( "tcp://localhost:9902" );
inControlSocket.setsockopt( ZMQ_SUBSCRIBE, "", 0 );
synchronizeSocket.connect( "tcp://localhost:9903" );
messagePoll.fd = 0;
messagePoll.events = ZMQ_POLLIN;
messagePoll.revents = 0;
messagePoll.socket = inMessageSocket;
controlPoll.fd = 0;
controlPoll.events = ZMQ_POLLIN;
controlPoll.revents = 0;
controlPoll.socket = inControlSocket;
}
void ZMQWorker::run()
{
QString message;
// Wait for start signal from server
bool started = true;
do
{
qxtLog->debug() << "Attempting to receive START signal from server";
recvQString( inControlSocket, message );
qxtLog->info() << "Received message: " << message;
started = message == "START";
} while( !started );
message = "SYNC";
sendQString( synchronizeSocket, message );
recvQString( synchronizeSocket, message );
qxtLog->debug() << "Recieved a SYNC message" << message;
WAS_ASSERT_MSG( message == "SYNC", "Invalid sync message" );
qxtLog->info() << "Received START signal from manager";
int messagesSent = 0;
forever
{
zmq::poll( &messagePoll, 1, 0 );
if( messagePoll.revents & ZMQ_POLLIN )
{
recvQString( inMessageSocket, message );
qxtLog->info() << "Received message: " << message;
}
zmq::poll( &controlPoll, 1, 0 );
if( controlPoll.revents & ZMQ_POLLIN )
{
recvQString( inControlSocket, message );
qxtLog->info() << "Received message: " << message;
if( message == "STOP" )
break;
}
if( messagesSent < 1000 )
{
qxtLog->debug() << "Sending " << messagesSent << "th message";
QString message = QString::number( messagesSent );
sendQString( outMessageSocket, message );
messagesSent++;
}
}
}
From zmqmanager.cpp
ZMQManager::ZMQManager( const QString& serverName, unsigned clientCount, QObject* parent ) :
QThread( parent ),
serverName( serverName ),
clientCount( clientCount ),
context( 1 ),
inMessageSocket( context, ZMQ_PULL ),
outMessageSocket( context, ZMQ_PUSH ),
outControlSocket( context, ZMQ_PUB ),
synchronizeSocket( context, ZMQ_REP )
{
qxtLog->debug() << "Initializing server " << serverName;
outMessageSocket.bind( "tcp://*:9900" );
inMessageSocket.bind( "tcp://*:9901" );
outControlSocket.bind( "tcp://*:9902" );
synchronizeSocket.bind( "tcp://*:9903" );
messagePoll.fd = 0;
messagePoll.events = ZMQ_POLLIN;
messagePoll.revents = 0;
messagePoll.socket = inMessageSocket;
synchronizePoll.fd = 0;
synchronizePoll.events = ZMQ_POLLIN;
synchronizePoll.revents = 0;
synchronizePoll.socket = synchronizeSocket;
}
void ZMQManager::run()
{
QString message;
unsigned clientsConnected = 0;
do
{
qxtLog->debug() << "Publishing START signal";
message = "START";
sendQString( outControlSocket, message );
zmq::poll( &synchronizePoll, 1, 5000 );
if( synchronizePoll.revents & ZMQ_POLLIN )
{
qxtLog->debug() << "Checking for response to START signal";
recvQString( synchronizeSocket, message );
qxtLog->debug() << "Recieved a SYNC message" << message;
WAS_ASSERT_MSG( message == "SYNC", "Invalid sync message" );
sendQString( synchronizeSocket, message );
clientsConnected++;
}
} while( clientsConnected < clientCount );
qxtLog->info() << "Started and syncrhonized with clients";
unsigned messagesSent = 0;
unsigned messagesReceived = 0;
do
{
zmq::poll( &messagePoll, 1, 0 );
if( messagePoll.revents & ZMQ_POLLIN )
{
recvQString( inMessageSocket, message );
qxtLog->info() << "Received message: " << message;
messagesReceived++;
}
if( messagesSent < clientCount * 1000 )
{
qxtLog->debug() << "Sending a message";
message = QString::number( messagesSent );
sendQString( outMessageSocket, message );
messagesSent++;
}
} while( messagesSent < clientCount * 1000 && messagesReceived < clientCount * 1000 );
message = "STOP";
sendQString( outControlSocket, message );
}
So, at long last my question:
- Am I overlooking something in this code?
- Am I packing/unpacking the ZeroMQ messages incorrectly?
- Am I managing the
REQ/REP
syncrhonization incorrectly?
I would really appreciate some insight from someone who has some experience with ZMQ, specifically with mixed PUB/SUB
, PUSH/PULL
and REQ/REP
patterns.
The problem was that ZeroMQ does not copy data when you use the constructor
zmq::message_t( void*, int size );
Instead, ZeroMQ just uses the buffer that you provide it as storage for the message. So, if that buffer is freed or overwritten before the message is actually sent, you will get garbage or a seg-fault.
The solution is relatively easy. Instead you use this constructor:
zmq::message_t( int size );
It creates a buffer of the specified size. Then you simply manually copy the data into the message manually before you send.
Here is the corrected, and now working send functions:
void sendQString( zmq::socket_t& socket, QString& str )
{
QByteArray package;
QDataStream packer( &package, QIODevice::WriteOnly );
packer << str;
qxtLog->debug() << "sending a message of " << package.size() << " bytes";
qxtLog->debug() << "the message says " << str;
zmq::message_t msg( package.size() );
memcpy( msg.data(), package.data(), package.size() );
ASSERT_MSG( socket.send( msg ), "Failed to send QString" );
}
void recvQString( zmq::socket_t& socket, QString& str )
{
zmq::message_t msg;
ASSERT_MSG( socket.recv( &msg ), "Failed to receive QString" );
qxtLog->debug() << "received a message of " << (int)msg.size() << " bytes";
QByteArray package( ( char* )msg.data(), msg.size() );
QDataStream unpacker( &package, QIODevice::ReadOnly );
unpacker >> str;
qxtLog->debug() << "the message says " << str;
}
Thanks to the guys on the #zeromq channel on freenode, I was finally able to identify the problem.
精彩评论