开发者

Trouble with Send and Recv

So I'm new to socket programming. For the past day, I have been trying to figure out a problem with my "Unregister".

So far, I can register a user. But when I unregister only one byte is sent. And I'm not sure how to fix this:

Sorry for my bad coding. Scroll down to client.cpp, that is where my recv for unregister only receives 1 byte =(

============================== Peer.h

#ifndef PEER_H
#define PEER_H

#include <string>
#include <iostream>
#include <arpa/inet.h>   // for sockaddr_in and inet_addr()

using namespace std;

class Peer
{
public:
    string IP;          // IP address
    int port;           // port number
    int sock;           // socket descriptor
    string name;

    /* Create a new peer entry on the server 
     * @param addr The IP and port of the peer.
     * @param desc The socket associated for the connection to the peer.
     */
    Peer(sockaddr_in addr, int desc);

    /* Destory a peer entry on the server
     */
    ~Peer();

    /* Processing incoming traffic from the peer.
     */
    void processRequest();

    void handleReg(string message);
    string getUserName(string message);
    bool checkUserExist(string username);
    int removeUser(string userName);
    void addUser();
    void printAllUsers();
    void handleAdd(string message);
};
#endif

=========================================== Peer.cpp

/*
 *
 *  Created by Mea Wang on 24/10/09.
 *  Copyright 2009 University of Calgary All rights reserved.
 *
 */

#include "peer.h"
#include "mtype.h"
#include "Error.h"
#include <vector>
#define RCVBUFSIZE 100
#define MAXUSERNAME 8

// Default Messages
string userExists = "The User already exists";
string regAckStr = "Registration Acknowledged";
string errUserName = "User Name Invalid : Total Characters Must be less than ";


// Server functions
vector<Peer> getPeers();
void setPeers(vector<Peer> newPeers);

using namespace std;

Peer::Peer(sockaddr_in addr, int desc)
{
    IP = inet_ntoa(addr.sin_addr);
    port = (int)ntohs(addr.sin_port);
    sock = desc;
    name = "";
}

Peer::~Peer()
{
}

void Peer::processRequest()
{
    char msgRcvBuffer[RCVBUFSIZE];
    int recvMsgSize;

    // Receive the header of the message
        if ((recvMsgSize = recv(sock, msgRcvBuffer, RCVBUFSIZE, 0)) < 0)
        DieWithError("recv() failed");


        msgRcvBuffer[recvMsgSize] = '\0';
        string msgType( msgRcvBuffer , msgRcvBuffer + recvMsgSize);

    // According to the message type, receive the body of the message
    // Perform the appropriate action for each received message
    // Then send back message
            switch(msgType[0])
        {
            // Register
            case(1):
                {
                    // Register / Unregister
                    handleReg(msgType);
                }break;
            case(2):
                {

                }break;
            case(3):
                {
                    // Add Request
                    handleAdd(msgType);
                }break;
            case(4):
                {

                }break;
            default:
                {
                    cout << " Message Type Not Found " << endl;
                }break;

        };
}


void Peer::handleReg(string message)
{

    // Remove the Prefix
    int startIndex = 2;
    // Get the username from the message
    string userName = message.substr(startIndex, message.size() - startIndex);
    //cout << "Username " << userName << endl;  

    // Unregister
    if (message[1] == 1)
    {
        // If user name does not belong to peer, respond
        if (userName.compare(name) != 0)
        {
            char err[RCVBUFSIZE];
            string error = "Error - Unable to Unregister ";
            error.append(userName);
            err[0] = 0;
            char *msgPt = &err[1];
            int msgSize = error.size() + userName.size();
            strncpy(msgPt, error.c_str(), msgSize);
            if (send(sock, &err, msgSize + 1, 0) < 0)
            DieWithError("send() failed");  
        }
        // If user name does not exist, exit.
        else if (!checkUserExist(userName))
        {
            char userNotFoundMsg[RCVBUFSIZE];
            string notFound = "User not found";
            cout << "User not found" << endl;
            userNotFoundMsg[0] = 0;
            char* msgBufferPt = &userNotFoundMsg[1];
            strncpy(msgBufferPt, notFound.c_str(), notFound.size());

            if (send(sock, &userNotFoundMsg, notFound.size() + 1, 0) < 0)
            DieWithError("send() failed");

        } else
        {
            // Remove User from List and Print all remaining users
            int numUsers = removeUser(userName);
            cout << "Registered Users" << endl;
            printAllUsers();
            // If no users left after unregistering add another notification
            if (numUsers == 0) { cout << ">No Users\n"; }
            // Send acknowledgedment that user has been erased

            string temp = "User has been removed";
            int msgSize = temp.size();
            if (send(sock, temp.c_str(), msgSize, 0) < 0)
            DieWithError("send() failed");

        }
    }
    // Register
    else if(message[1] == 0)
    {
        // Check to see if already registered
        if (name.compare("") != 0)
        {

            string error = "Error - Already Registered: ";
            error.append(name);
            char err[RCVBUFSIZE];
            err[0] = 0;
            char *msgPt = &err[1];
            int msgSize = error.size() + name.size();
            strncpy(msgPt, error.c_str(), msgSize);

            if (send(sock, &err, msgSize + 1, 0) < 0)
            DieWithError("send() failed");

        }
        else if (checkUserExist(userName))
        {
                char msgBuffer[RCVBUFSIZE];
                msgBuffer[0] = 0;
                char* msgBufferPt = &msgBuffer[1];
                strncpy(msgBufferPt, userExists.c_str(), userExists.size());

                //string temp( msgBuffer , msgBuffer + userExists.size() + 1);
                //cout << "Test output: " << temp << endl;

                int msgSize = userExists.size() + 1;
                if (send(sock, msgBuffer, msgSize, 0) < 0)
                DieWithError("send() failed - User Exists");
        }
        else
        {
            // Add user to list of users on server
            name = userName;
            // Add this peer
            addUser();
            cout << "Registered Users\n";
            printAllUsers();
            // Prepare message considering success of Registration
            char msgBuffer[RCVBUFSIZE];
            msgBuffer[0] = 2;
            char* msgBufferPt = &msgBuffer[1];
            strncpy(msgBufferPt, regAckStr.c_str(), regAckStr.size());
            int msgSize = regAckStr.size() + 1;
            if (send(sock, &msgBuffer, msgSize, 0) != msgSize)
            DieWithError("send() failed");

        }
    } else
    {
    cout << "The Message Type is Incorrect " << endl;
    }
}

void Peer::handleAdd(string message)
{

}

// Print all users within the list of users
void Peer::printAllUsers()
{
    vector<Peer> serverPeers = getPeers();
    for (int i = 0; i < serverPeers.size(); i++)
    {
        cout << "User:\t" << serverPeers[i].name << "\tIp:\t" << serverPeers[i].IP << "\n";
    }
}

// Add user to list of registered users
void Peer::addUser()
{
    vector<Peer> serverPeers = getPeers();
    serverPeers.push_back(*this);
    setPeers(serverPeers);
}

// Remove a certain user from the list of users
// Return the number of users left
int Peer::removeUser(string userName)
{
    vector<Peer> serverPeers = getPeers();
    int index = 0;
    for (int i = 0; i < serverPeers.size(); i++)
    {
        if (serverPeers[i].name.compare(userName) == 0)
        {
            index = i;
            break;
        }
    }

    // Reset name to nothing
    name = "";  
    serverPeers.erase(serverPeers.begin()+index);   
    setPeers(serverPeers);
    int numUsers = serverPeers.size();
    return numUsers;
}

// Get the user object from the user list
bool Peer::checkUserExist(string username)
{
    vector<Peer> serverPeers = getPeers();
    int index = 0;
    for (int i = 0; i < serverPeers.size(); i++)
    {
        if (serverPeers[i].name.compare(username) == 0)
        {
            return true;
        }
    }
    return false;
}

// Get the user name from the message received
string Peer::getUserName(string message)
{
        // Find end of message and get index
        int startIndex = 2;
        // Get the username from the message
        string userName = message.substr(startIndex, message.size() - startIndex);

        return userName;
}


============================================

============================================ Client.cpp - MY RECV GETS 1 BYTE =(

#include "client.h"
#include "mtype.h"
#include "Error.h"
#include <sys/socket.h>  // for connect()
#include <arpa/inet.h>   // for sockaddr_in and inet_addr()
#include <string.h>      // for memset()
#include <iostream>
#include <cstdlib>
#include <sstream>
#include <vector>

#define REGISTER 0
#define UNREGISTER 1
#define RCVBUFSIZE 100

using namespace std;

void handleRegister(string userName, int regType);
vector<string> getInput(string msg);

TCPClient::TCPClient()
{
}

TCPClient::~TCPClient()
{
}

int TCPClient::connectTCP(string IP, int port)
{
    int value;

    // Create socket
    if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0)
        DieWithError("socket() failed");

    // Free up the port before binding.
    value = 1;
    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(int)) < 0)
    {
        cout << "Error in free up the port.";
        return -1;
    }

    struct sockaddr_in servAddr;

    // Construct the remost host address structure
    memset(&servAddr, 0 ,sizeof(servAddr));
    servAddr.sin_family     = AF_INET;
    servAddr.sin_addr.s_addr    = inet_addr(IP.c_str());
    servAddr.sin_port       =  htons(port);

    // Connect to the remote host
    if (connect(sock, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0)
        DieWithError("connect() failed");

    return 0;
}

void TCPClient::processCmd(string msg)
{

    vector<string> input = getInput(msg);

    // Receive the header of the message
    if (input[0] == "register")
    {
        handleRegister(input[1], REGISTER);
    }
    else if (input[0] == "unregister")
    {
        handleRegister(input[1], UNREGISTER);
    }

    // According to the message type, receive the body of the message
}

/*
* Handle either registration or unregister
*/
void TCPClient::handleRegister(string userName, int regType)
{
    int bufferSize = 32;
    char messageBuffer[bufferSize];
    char* message;
    if (userName.size() > 8)
    {
        cout << "Invalid username : Greater than 8 characters" << endl;
    }
    else
    {
        switch(regType)
        {
            case(REGISTER):
            {
                messageBuffer[0] = 1;
                messageBuffer[1] = 0;
                for (int i = 2; i < (userName.size() + 2); i++)
                {
                    messageBuffer[i] = userName[(i - 2)];
                }

                message = &messageBuffer[0];

                int messageLen = userName.size() + 2;

                sendMessage(message, messageLen);

                string rcvMsg = rcvMessage();

                // Acknowledged
                if (rcvMsg[0] == 2)
                {
                    string output;
                    output.assign(rcvMsg, 1, rcvMsg.size() - 1);
                    cout << output << endl;

                }
                // User name was not registered, Ask user to enter username again
                else if(rcvMsg[0] == 0)
                {
                    string output;
                    output.assign(rcvMsg, 1, rcvMsg.size() - 1);
                    cout << "Server Output: " << output << endl;
                }
            }break;
            case(UNREGISTER):
            {
                messageBuffer[0] = 1;
                messageBuffer[1] = 1;
                char* msgBufferPt = &messageBuffer[2];
                strncpy(msgBufferPt, userName.c_str(), userName.size());
                char* msgPt = &messageBuffer[0];
                // Send message and wait for acknowledgement
                sendMessage(msgPt, userName.size() + 2);




                string rcvMsg = rcvMessage(); <<<<<<<<<<< I GET 1 BYTE HERE





                cout << rcvMsg << endl;
            }break;
            default:
                cout << " Msg Type Incorrect " << regType << endl;
                break;
        };
    }
}

// Get the input commands from the user by parsing them
vector<string> TCPClient::getInput(string msg)
{
    vector<string> input;
    istringstream iss(msg);

    do
    {
        string sub;
        iss >> sub;
        input.push_back(sub);
    } while (iss);

    return input;

}

/*
* Send the message to the server
*/
void TCPClient::sendMessage(char* message, int messageLen)
{
    if (send(sock, message, messageLen, 0) < 0)
        DieWithError("send() error");

}

string TCPClient::rcvMessage()
{
    char msgBuffer[RCVBUFSIZE];
    int recvMsgSize = RCVBUFSIZE;
    int bytesRcvd = 0;

    // Get Message from Server
    if ((bytesRcvd = recv(sock, msgBuffer, recvMsgSize, 0)) < 0)
        DieWithError("recv() failed or connection closed prematurely");

    cout << "Bytes received: " << bytesRcvd << endl;

    msgBuffer[bytesRcvd] = '\0';

    string rcvMsg( msgBuffer , msgBuffer + bytesRcvd);
    return rcvMsg;
}


================================================================= Server.cpp


#include "server.h"
#include "Error.h"
#include "peer.h"
#include <sys/socket.h>  // for connect()
#include <arpa/inet.h>   // for sockaddr_in and inet_addr()
#include <string.h>      // for memset()
#include <sys/time.h>
#include <sys/select.h>
#include <errno.h>
#include <iostream>
#include <cstdlib>
#include <vector>
#include <fcntl.h>

#define MAXPENDING 5
#define TRUE 1
#define FALSE 0

int activity;
vector<Peer> peers;
vector<Peer> registeredUsers;

using namespace std;

TCPServer::TCPServer(int p)
{
    port = p;
}

TCPServer::~TCPServer()
{
}

int TCPServer::init()
{

//  int clntSock;
//  struct sockaddr_in servAddr;
//  struct sockaddr_in clntAddr;
//  unsigned int clntLen;

    // Create socket
    if ((serverSock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
        DieWithError("socket() failed");

    // Free up the port before binding.
    int value = 1;
    if (setsockopt(serverSock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(int)) < 0)
    {
        cout << "Error in free up the port." << endl;
        return -1;
    }

    // Construct the server address structure
    memset(&servAddr, 0 ,sizeof(servAddr));
    servAddr.sin_family     = AF_INET;
    servAddr.sin_addr.s_addr    = htonl(INADDR_ANY);
    servAddr.sin_port       = htons(port);


    // Bind to the local address
    if (bind(serverSock, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0)
        DieWithError("bind() failed");


    // Mark the socket so it will listen for incoming connections
    if (listen(serverSock, MAXPENDING) < 0)
        DieWithError("listen() failed");


    return 0;
}

int TCPServer::run()
{
    // Set of sockets
    fd_set readfds;

    while (true)
    {

        // Clear the socket FD_SET for select().
        /* create a list of sockets to check for activity */
        FD_ZERO(&readfds);

        // Set the socket set for select() by adding all the sockets
        /* specify mastersocket - ie listen for new connections */
        // serverSocket is my master socket
        FD_SET(serverSock, &readfds);

        for (int index = 0; index < peers.size(); index++) {
            if (peers[index].sock > 0)
            {
                int tempSock = peers[index].sock;
                FD_SET(tempSock, &readfds);
            }
        }

        // Set the timeout value for select()
        struct timeval timeout;  /* Timeout for select */

        timeout.tv_sec = 6;
        timeout.tv_usec = 0;

        // Select()
        /* wait for connection, forever if have to */
        activity = select(MAXPENDING + 3, &readfds, NULL, NULL, &timeout);

        //cout << "Activity " << activity << endl;

        if ((activity<0) && (errno!=EINTR))
        {
            /* there was an error with select() */
        }

        if (activity == 0)
        {
            // No activity so let it continue
        }
        else
        {
            if (FD_ISSET(serverSock, &readfds))
            {
                    receive();
            }

            char tempBuffer[1025];
            int valread;
            for (int j=0; j<peers.size(); j++)
            {
                if (FD_ISSET(peers[j].sock, &readfds))
                {
                    if ((valread = recv(peers[j].sock, tempBuffer, 1024, MSG_PEEK)) < 0)
                    {
                      close(peers[j].sock);
                    }
                    else 
                    {
                        // Process Request
                        peers[j].processRequest();
                    }
                }
            }
        }
    }
    return 0;
}

void TCPServer::receive()
{
    // Check the server socket and process new connection request, if any.
        // Accept connection request (blocking for the moment)

        /* Open the new socket as 'new_socket' */
          clntLen = sizeof(clntAddr);
          if ((clntSock = accept(serverSock, (struct sockaddr *) &clntAddr, &clntLen))<0)
          {
            /* if accept failed to return a socket descriptor, display error and exit */
            DieWithError("accept() failed [FD_ISSET]");
            //exit(EXIT_FAILURE);
          }

          fcntl(serverSock, F_SETFL, O_NONBLOCK);

            Peer newPeer(clntAddr, clntSock);
            peers.push_back(newPeer);

            cout << "CLIENT SOCKET: " << clntSock << endl;

    // Check all existing peers for incoming traffic and call processRequest()
 }

 // Return the vector of peers
 vector<Peer> getPeers()
 {
    return registeredUsers;
 }

 // Set peers
 void setPeers(vector<Peer> newUsers)
 {
    registeredUsers = newUsers;
 }

================================ clientmain.cpp
#include <iostream>
#include <client.h>
#include <server.h>
#include <sys/time.h>
#include <string>
#include <cstdlib>
#include "Error.h"

using namespace std;

int main (int argc, char * const argv[]) 
{
    int serverPort;
    string serverIP;
    TCPClient * clnt;

    if (argc < 3)
    {
        cout << "Usage: " << argv[0] << "<server IP> <server port #>" << endl;
        return 0;
    }
    else
    {
        serverIP = argv[1];
        serverPort = atoi(argv[2]);
    }

    // Create a client
    clnt = new TCPClient();

    // Connect this client to the server
    if (clnt->connectTCP(serverIP, serverPort) < 0)
        cout << "Error in connecting to the server." << endl;
    // Process user command and interact with the server accordingly
    else
    {
        string cmd = "";
        while (cmd != "quit")
        {
            cout << "Please enter a command: ";
            getline(cin, cmd);
            clnt->processCmd(cmd);

        }
    }

    return 0;
}
============================= servermain.cpp
#include <iostream>
#include <client.h>
#include <server.h>
#include <sys/time.h>
#include <string>
#include <cstdlib>

using namespace std;

int main (int argc, char * const argv[]) 
{
    int port;
    TCPServer * svr;

    if (argc != 2)
    {
        cout << "Usage: " << argv[0] << " <server port #>" << endl;
        return 0;
    }
    else
    {
        port = atoi(argv[1]);
    }


    // Create a server
    svr = new TCPServer(port);

    // Initialize the server
    if (svr->init() < 0)
    {
        exit(0);
    }

    // Run the server
    svr->run();

    return 0;
}


======================================================== mtype.h

/*
 *  mtype.h
 *  podcast
 *
 *  Created by Mea Wang on 27/10/09.
 *  开发者_运维问答Copyright 2009 University of Calgary. All rights reserved.
 *
 */

#ifndef MTYPE_H
#define MTYPE_H

// Defines various types of the message related to the engine.
namespace types
{
    // upstream node -> downstream node.
    const int data = 0;           // data messages.
    const int newUpstream = 1;    // new upstream node connecting.

    // network thread -> engine thread.
    const int createSession = 2;     // Create a data session.
    const int removeSession = 3;     // Remove a data session.
    const int removePeer = 4;        // The upstream node has failed.
    const int terminate = 5;         // Terminate a node.

    // Returned from the algorithm to the engine.
    const int consumed = 1;     // The message has been consumed.
    const int hold = 2;         // The message needs to wait for later processing.

    // Types of data sources.
    const int arbitrary = 1;    // Arbitrary data.
    const int stdinput = 2;     // Standard input.
    const int file = 3;         // Regular file.
    const int customized = 4;   // Customized production of data.
}

#endif

========================================================== Server.h

/*
 *  server.h
 *  podcast
 *
 *  Created by Mea Wang on 24/10/09.
 *  Copyright 2009 University of Calgary. All rights reserved.
 *
 */

#ifndef SERVER_H
#define SERVER_H

#include "peer.h"
#include <map>
#include <vector>

#define MAXCLIENTS 10

using namespace std;

// This is used by the reporter
typedef map<int, Peer *> PeerMap;
typedef PeerMap::iterator PeerMapIterator;
typedef PeerMap::value_type PeerMapType;

class TCPServer
{
public:
    int serverSock;      // The TCP socket to be listened to
    fd_set recvSockSet;  // The set of descriptors that are ready to receive
    vector<Peer> peers;       // A list of all clients or peers
    int port;            // The listening port
    int maxDesc;
    //int client_socket[MAXCLIENTS];

    int clntSock;       // The Client Socket
    struct sockaddr_in servAddr;
    struct sockaddr_in clntAddr;
    unsigned int clntLen;

    /* Constructor: Create a TCP server.
     * @param port The local port number.
     */
    TCPServer(int p);

    /* Destructor: destroy a server.
     */
    ~TCPServer();

    /* Initialize the server.
     * @return -1 if error occurs, 0 otherwise.
     */
    int init();

    /* Runs the server to monitor all incoming ports using select().
     * @return -1 if error occurs, 0 otherwise.
     */
    int run();

    /* Process all incoming traffic including new connections.
     * This function is called by run() after select().
     */
    void receive();
};

#endif

=====================================================================client.h
/*
 *  client.h
 *  podcast
 *
 *  Created by Mea Wang on 24/10/09.
 *  Copyright 2009 University of Calgary. All rights reserved.
 *
 */

#ifndef CLIENT_H
#define CLIENT_H

#include "Error.h"
#include <string>
#include <vector>

using namespace std;

class TCPClient
{
public:
    int sock;             // socket descriptor

    /* Constructor: Create a TCP client.
     * @param port The local port number.
     */
    TCPClient();

    /* Destructor: destroy a client.
    */
    ~TCPClient();

    /* Establish the connection with the remote host. 
     * @param IP The IP address of the remote host.
     * @param port The port number on the remote host.
     * @return -1 if error occurs, 0 otherwise.
     */
    int connectTCP(string IP, int port);

    /* Send a message to the server.
     * @param msg The message to be sent.
     */
    void processCmd(string msg);

    /*
    */
    vector<string> getInput(string msg);

    void handleRegister(string userName, int regType);

    void sendMessage(char* message, int messageLen);
    string rcvMessage();
};

#endif

===========================================================================

================ If anyone is interested in running it heres my makefile: CC = g++ -pthread -idirafter ./ server = servermain.cpp client = clientmain.cpp obj = peer.o client.o server.o Error.o

all: lib.a server client

lib.a: $(obj) ar r lib.a $(obj)

peer.o: peer.cpp peer.h $(CC) -c peer.cpp -o peer.o

client.o: client.cpp client.h Error.h $(CC) -c client.cpp -o client.o

server.o: server.cpp server.h Error.h $(CC) -c server.cpp -o server.o

error.o: Error.cpp Error.h g++ -c server.cpp -o server.o server: $(CC) $(server) $(obj) -o server

client: $(CC) $(client) $(obj) -o client

clean: rm *.o server client

======================

To run, its ./server 7000 and on another terminal: ./client 127.1.1.100 7000

=============

Thanks for all the effort ^^


I can't see an obvious problem, but as JB says, this is a very large piece of code and it's hard to spot the problem inside.

However, you seem to be assuming that every time you send() a sequence of bytes, the equivalent recv() call in the client will receive those bytes. This is not true in general for streaming sockets. That is, it's completely possible for this to happen:

Server: send("HELLO");
Client: First call to receive() == "HEL";
        Second call to receive() = "L";
        Third call = "O";

That is, you cannot guarantee that Client will receive "HELLO" in one go.

The usual way to deal with stream protocols is to buffer receives until you know you have an entire record to process. For example, in HTTP, servers will keep reading until they see a newline, and then process that whole line. In the case of your protocol, it might make more sense to prepend the size of the record to every message, e.g.

Server: sendbyte(strlen("HELLO"));
        send("HELLO");
Client: int num_bytes = receivebyte();
        while bytes_read < num_bytes; { buffer += receive(); }
        processMessage(buffer);

In order to get this into your code, you'd need to have a state machine for each client to keep track of where you are in reassembling the received messages.

Hope that helps in some way.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜