开发者

How can I receive multipart messages with ZeroMQ?

I can't get ZeroMQ C++ wra开发者_Python百科pper to receive multipart messages. The same code using C version works just fine, but it leads to an exception with no explanations at all with C++. The multipart handling code is as follows:

int _tmain(int argc, _TCHAR* argv[])
{
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_REP);
    socket.bind("tcp://*:5555");

    while(true) {
        // the following two lines lead to exception
        zmq::message_t request;
        socket.recv(&request);

        //zmq_msg_t message;
        //zmq_msg_init (&message);
        //zmq_recv (socket, &message, 0);   
    }

    return 0;
}

It is extremely simple; this version does not work. but if I comment out the first two lines in the while loop and uncomment the currently commented (C version) code, it works. This is Windows XP sp3, Zeromq 2.1.1 and Visual Studio 2010 Express.

If I send single part messages, both versions work fine. What am I doing wrong?


I'm also a newbie in ZMQ and I too had to struggle a lot in order to understand multipart messaging using REP/REQ in ZeroMQ. I had to go through multiple resources and stitch data in order to understand this. I think this answer will help many seekers in the near future that's why I am sharing the client and server code here. I have tested this code and it is working perfectly fine. However, being a newbie there are chances that I would have missed something vital. Please share your valuable inputs.

Server Code

void
serverMultipartREPREQ()
{
    try
    {
        zmq::context_t context(1);
        zmq::socket_t socket(context, ZMQ_REP);
        socket.bind("tcp://*:5556");
        std::cout << "Listening at port 5556..." << std::endl;

        zmq::message_t reply;

        socket.recv(reply, zmq::recv_flags::none);
        auto rep = std::string(static_cast<char*> (reply.data()), reply.size());

        std::cout << "Received: " << rep << std::endl;
        
        while(1)
        {    
            if (input == "exit")
                break;

            for (int j = 0; j < 3; ++j)
            {
                std::string s("Message no - " + std::to_string(j));

                zmq::message_t message(s.length());
                memcpy(message.data(), s.c_str(), s.length());

                std::cout << "Sending: " << s << std::endl;

                if (j != 2)
                    socket.send(message, zmq::send_flags::sndmore);
                else
                    socket.send(message, zmq::send_flags::none); 
            }
        }
    }
    catch (const zmq::error_t& ze)
    {
        std::cout << "Exception: " << ze.what() << std::endl;
    }

    Sleep(5000);
}

Client code

void
clientMultipartREQREP()
{
    try
    {
        zmq::context_t context(1);

        std::cout << "Connecting to socket at 5556" << std::endl;
        zmq::socket_t socket(context, ZMQ_REQ);
        socket.connect("tcp://localhost:5556");
        std::cout << "Connected to socket at 5556" << std::endl;

        std::string msg("Hii this is client...");
        zmq::message_t message(msg.length());
        memcpy(message.data(), msg.c_str(), msg.length());

        socket.send(message, zmq::send_flags::none); // send to server (request message)

        while (true)
        {
            __int64 more = 1;

            if (more)
            {
                zmq::message_t message;
                socket.recv(message, zmq::recv_flags::none);
                auto rep = std::string(static_cast<char*> (message.data()), message.size());
                std::cout << "Reading from client: " << rep << std::endl;

                size_t size = sizeof(__int64);
                socket.getsockopt(ZMQ_RCVMORE, &more, &size); // if msg is not the last one then more = 1 else more = 0
            }
            else
            {
                std::cout << "Done..." << std::endl;
                break;
            }
        }
    }
    catch (const zmq::error_t& ze)
    {
        std::cout << "Exception: " << ze.what() << std::endl;
    }
    Sleep(5000);
}


Probably C version of code doesn't work either, but you don't check the return code of zmq_recv, so you don't notice it. Also, when receiving miltipart messages you should check if there are more message parts to be received through the socket, like this:

int64_t more = 0;
size_t more_size = sizeof(more);
socket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
if (more != 0)
{
  //has more parts
}

Also, take a look at ZmqMessage C++ library designed specifically for Sending and receiving ZeroMQ multipart messages.


I decided to use the C version of the code. In general all examples seem to be in C anyway.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜