ZeroMQ PUSH/PULL and lost message
I'm making use of ZeroMQ from .NET and got stuck trying to fix a weird issue. I've got a socket of type PUSH and one o开发者_StackOverflowf type PULL over TCP. When the client disconnects, the server is still able to to send a message (note that no flags are passed to the Socket.Send method) which gets completely lots before starting to block and waiting for the client to reconnect and delivering messages I try to send afterwards.
How can I avoid losing the message (or in the worst case test if a client is connected and if not send a dummy message which I can afford losing)?
Thanks in advance!
Edit: further testing reveals that if I wait 1 second after sending the first message after disconnection by the client, the second one will block, but if I don't wait at all I can send as many messages as I want and they'll all get lost. That's quite confusing...
The ZeroMQ documentation notes that this is a problem with PUSH/PULL setups and suggests the following pattern: an addition of a REP/REQ setup to provide node coordination when you're expecting a fixed number of subscribers. However, if you are not able to know the number of subscribers in advance, you should consider changing your protocol to be more resilient to these conditions.
Synchronized publisher in C (from ZGuide)
//
// Synchronized publisher
//
#include "zhelpers.h"
// We wait for 10 subscribers
#define SUBSCRIBERS_EXPECTED 10
int main (void)
{
s_version_assert (2, 1);
void *context = zmq_init (1);
// Socket to talk to clients
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5561");
// Socket to receive signals
void *syncservice = zmq_socket (context, ZMQ_REP);
zmq_bind (syncservice, "tcp://*:5562");
// Get synchronization from subscribers
int subscribers = 0;
while (subscribers < SUBSCRIBERS_EXPECTED) {
// - wait for synchronization request
char *string = s_recv (syncservice);
free (string);
// - send synchronization reply
s_send (syncservice, "");
subscribers++;
}
// Now broadcast exactly 1M updates followed by END
int update_nbr;
for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
s_send (publisher, "Rhubarb");
s_send (publisher, "END");
zmq_close (publisher);
zmq_close (syncservice);
zmq_term (context);
return 0;
}
精彩评论