开发者

Slow transfer speed on remote computer

Hey there StackOverflow people!

I'm making an IOCP server and I have ironed out most issues so far but one still remain and I do not know where to start looking at. When I run the client/server on my machine everything is fine and dandy. It matches the speed of the Windows SDK Sample maybe a little bit faster and definitely uses less CPU cycle. However when I run the client from a separate computer, transfer speed caps at 37 KB/s and has a roundtrip latency of 200ms (as opposed to 0). Now if I connect the client to the SDK Sample server, I don't have that problem so there is something wrong with my code. As far as I know, the sockets are initialized the exact same way with the same options. I have also ran my server in a profiler to check for bottleneck but I couldn't find any. Also, the computers I have tried it on were connected to the same gigabit switch (with gigabit adapter). I know this is kind of vague but that's because I couldn't pinpoint the problem so far and I would be eternally grateful if any of you guys could point me in the right direction.

Cheers,

-Roxy

EDIT2: After following Mike's advise, I did some research on the code and found out that when a remote client connects to the server most of the time the code is waiting on GetQueuedCompletionStatus. This suggest that IO request are simply taking a long time to complete but I still don't understand why. This only occurs only when the client is on a remote computer. I'm thinking this has something to do with how a setup the sockets or how I'm posting the request but I don't see any difference with the sample code.

Any ideas?

EDIT (Added sample code):

Alright, here it is! It ain't pretty though!

If you have the Windows SDK installed, you can connect to it using the iocpclient sample (Program Files\Microsoft SDKs\Windows\v7.1\Samples\netds\winsock\iocp\client) and changing it's default port at line 73 to 5000.

Weird things I've just noticed when trying it myself is that it seems the sample iocpclient doesn't cause the same caps at 37KB/s issue... However it looks like the sample code has a limit set to around 800KB/s. I'll post a client if that can be of any help.

#pragma comment(lib, "Ws2_32.lib")

#include <WinSock2.h>
#include <stdio.h>

unsigned int connection = 0;
unsigned int upload = 0;
unsigned int download = 0;

#define IO_CONTEXT_COUNT 5

class NetClientHost
{
friend class gNetProtocolHost;
public:
enum Operation
{
    kOperationUnknown,
    kOperationRead,
    kOperationWrite,
};

struct ClientData
{
    SOCKET           socket;
};

struct IOContext
{
    WSAOVERLAPPED    overlapped;
    WSABUF           wsaReceiveBuf;
    WSABUF           wsaSendBuf;
    char            *buf;
    char            *TESTbuf;
    unsigned long    bytesReceived;
    unsigned long    bytesSent;
    unsigned long    flags;
    unsigned int     bytesToSendTotal;
    unsigned int     remainingBytesToSend;
    unsigned int     chunk;
    Operation        operation;
};

NetClientHost()
{
    memset((void *) &m_clientData, 0, sizeof(m_clientData));
}

NetClientHost::IOContext *NetClientHost::AcquireContext()
{
    while (true)
    {
        for (int i = 0; i < IO_CONTEXT_COUNT; ++i)
        {
            if (!(m_ioContexts + i)->inUse)
            {
                InterlockedIncrement(&(m_ioContexts + i)->inUse);
                //ResetEvent(*(m_hContextEvents + i));

                if ((m_ioContexts + i)->ioContext.TESTbuf == 0)
                    Sleep(1);

                return &(m_ioContexts + i)->ioContext;
            }
        }
        //++g_blockOnPool;
        //WaitForMultipleObjects(IO_CONTEXT_COUNT, m_hContextEvents, FALSE, INFINITE);
    }   
}

const ClientData *NetClientHost::GetClientData() const
{
    return &m_clientData;
};

void NetClientHost::Init(unsigned int bufferSize)
{   
    _InitializeIOContexts(bufferSize ? bufferSize : 1024);
}

void NetClientHost::ReleaseContext(IOContext *ioContext)
{
    int i = sizeof(_IOContextData), j = sizeof(IOContext);
    _IOContextData *contextData = (_IOContextData *) (((char *) ioContext) - (i - j));
    InterlockedDecrement(&contextData->inUse);
    //SetEvent(*(m_hContextEvents + contextData->index));
}   

struct _IOContextData
{
    unsigned int index;
    volatile long inUse;        
    IOContext ioContext;
};

ClientData                    m_clientData;
_IOContextData               *m_ioContexts;
HANDLE                       *m_hContextEvents;

void _InitializeIOContexts(unsigned int bufferSize)
{
    m_ioContexts = new _IOContextData[IO_CONTEXT_COUNT];
    m_hContextEvents = new HANDLE[IO_CONTEXT_COUNT];

    memset((void *) m_ioContexts, 0, sizeof(_IOContextData) * IO_CONTEXT_COUNT);

    for (int i = 0; i < IO_CONTEXT_COUNT; ++i)
    {
        (m_ioContexts + i)->index = i;

        (m_ioContexts + i)->ioContext.buf = new char[bufferSize];
        (m_ioContexts + i)->ioContext.wsaReceiveBuf.len = bufferSize;
        (m_ioContexts + i)->ioContext.wsaReceiveBuf.buf = (m_ioContexts + i)->ioContext.buf;
        (m_ioContexts + i)->ioContext.TESTbuf = new char[10000];
        (m_ioContexts + i)->ioContext.wsaSendBuf.buf = (m_ioContexts + i)->ioContext.TESTbuf;

        *(m_hContextEvents + i) = CreateEvent(0, TRUE, FALSE, 0);
    }
}
void _SetSocket(SOCKET socket)
{
开发者_运维知识库    m_clientData.socket = socket;
}
};



bool WriteChunk(const NetClientHost *clientHost, NetClientHost::IOContext *ioContext)
{
int status;

status = WSASend(clientHost->GetClientData()->socket, &ioContext->wsaSendBuf, 1, &ioContext->bytesSent, ioContext->flags, &ioContext->overlapped, 0);
if (status == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
    // ...
    return false;
}

return true;
}

bool Write(NetClientHost *clientHost, void *buffer, unsigned int size, unsigned int chunk)
{
//__ASSERT(m_clientHost);
//__ASSERT(m_clientHost->GetClientData()->remainingBytesToSend == 0);

NetClientHost::IOContext *ioContext = clientHost->AcquireContext();

if (!chunk)
    chunk = size;

ioContext->wsaSendBuf.buf = ioContext->TESTbuf;

ioContext->operation                = NetClientHost::kOperationWrite;
ioContext->flags                    = 0;
ioContext->wsaSendBuf.buf = new char[size];
memcpy((void *) ioContext->wsaSendBuf.buf, buffer, chunk);
ioContext->wsaSendBuf.len           = chunk;    
ioContext->chunk                    = chunk;
ioContext->bytesToSendTotal         = size;
ioContext->remainingBytesToSend     = size;

return WriteChunk(clientHost, ioContext);
}



void Read(NetClientHost *clientHost)
{   
NetClientHost::IOContext *ioContext = clientHost->AcquireContext();
int status;

memset((void *) ioContext, 0, sizeof(NetClientHost::IOContext));
ioContext->buf = new char[1024];
ioContext->wsaReceiveBuf.len = 1024;
ioContext->wsaReceiveBuf.buf = ioContext->buf;

ioContext->flags = 0;
ioContext->operation = NetClientHost::kOperationRead;

status = WSARecv(clientHost->GetClientData()->socket, &ioContext->wsaReceiveBuf, 1, &ioContext->bytesReceived, &ioContext->flags, &ioContext->overlapped, 0);
int i = WSAGetLastError();
if (status == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
    // ...
}   
}

bool AddSocket(HANDLE hIOCP, SOCKET socket)
{
++connection;

int bufSize = 0;
LINGER lingerStruct;
lingerStruct.l_onoff = 1;
lingerStruct.l_linger = 0;
setsockopt(socket, SOL_SOCKET, SO_SNDBUF, (char *) &bufSize, sizeof(int));
setsockopt(socket, SOL_SOCKET, SO_RCVBUF, (char *) &bufSize, sizeof(int));
setsockopt(socket, SOL_SOCKET, SO_LINGER, (char *) &lingerStruct, sizeof(lingerStruct) ); 

NetClientHost *clientHost = new NetClientHost;

clientHost->_InitializeIOContexts(1024);
clientHost->Init(0);
clientHost->_SetSocket(socket);

// Add this socket to the IO Completion Port
CreateIoCompletionPort((HANDLE) socket, hIOCP, (DWORD_PTR) clientHost, 0);

Read(clientHost);
return true;
}

int read = 0, write = 0;

DWORD WINAPI WorkerThread(LPVOID param)
{
LPOVERLAPPED overlapped;
NetClientHost *clientHost;
HANDLE hIOCP = (HANDLE) param;
DWORD ioSize;
BOOL status;

while (true)
{
    status = GetQueuedCompletionStatus(hIOCP, &ioSize, (PULONG_PTR) &clientHost, (LPOVERLAPPED *) &overlapped, INFINITE);

    if (!(status || ioSize))
    {
        --connection;
        //_CloseConnection(clientHost);
        continue;
    }

    NetClientHost::IOContext *ioContext = (NetClientHost::IOContext *) overlapped;

    switch (ioContext->operation)
    {
    case NetClientHost::kOperationRead:
        download += ioSize;
        Write(clientHost, ioContext->wsaReceiveBuf.buf, ioSize, 0);
        write++;
        clientHost->ReleaseContext(ioContext);
        break;

    case NetClientHost::kOperationWrite:
        upload += ioSize;
        if (ioContext->remainingBytesToSend)
        {
            ioContext->remainingBytesToSend -= ioSize;
            ioContext->wsaSendBuf.len = ioContext->chunk <= ioContext->remainingBytesToSend ? ioContext->chunk : ioContext->remainingBytesToSend; // equivalent to min(clientData->chunk, clientData->remainingBytesToSend);
            ioContext->wsaSendBuf.buf += ioContext->wsaSendBuf.len;
        }

        if (ioContext->remainingBytesToSend)
        {       
            WriteChunk(clientHost, ioContext);
        }
        else
        {
            clientHost->ReleaseContext(ioContext);              
            Read(clientHost);
            read++;
        }
        break;
    }
}

return 0;
}

DWORD WINAPI ListenThread(LPVOID param)
{
SOCKET sdListen = (SOCKET) param;

HANDLE hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
CreateThread(0, 0, WorkerThread, hIOCP, 0, 0);
CreateThread(0, 0, WorkerThread, hIOCP, 0, 0);
CreateThread(0, 0, WorkerThread, hIOCP, 0, 0);
CreateThread(0, 0, WorkerThread, hIOCP, 0, 0);

while (true)
{
    SOCKET as = WSAAccept(sdListen, 0, 0, 0, 0);
    if (as != INVALID_SOCKET)
        AddSocket(hIOCP, as);
}
}

int main()
{
SOCKET      sdListen;
SOCKADDR_IN si_addrlocal;   
int         nRet;   
int         nZero = 0;   
LINGER      lingerStruct;   

WSADATA wsaData;
WSAStartup(0x202, &wsaData);

sdListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);    
si_addrlocal.sin_family = AF_INET;   
si_addrlocal.sin_port = htons(5000);   
si_addrlocal.sin_addr.s_addr = htonl(INADDR_ANY);          
nRet = bind(sdListen, (struct sockaddr *)&si_addrlocal, sizeof(si_addrlocal));   
nRet = listen(sdListen, 5);

nZero = 0;   
nRet = setsockopt(sdListen, SOL_SOCKET, SO_SNDBUF, (char *) &nZero, sizeof(nZero));   
nZero = 0;   
nRet = setsockopt(sdListen, SOL_SOCKET, SO_RCVBUF, (char *)&nZero, sizeof(nZero));
lingerStruct.l_onoff = 1;   
lingerStruct.l_linger = 0; 
nRet = setsockopt(sdListen, SOL_SOCKET, SO_LINGER, (char *)&lingerStruct, sizeof(lingerStruct) );

CreateThread(0, 0, ListenThread, (LPVOID) sdListen, 0, 0);

HANDLE console = GetStdHandle(STD_OUTPUT_HANDLE);
while (true)
{
    COORD c = {0};
    SetConsoleCursorPosition(console, c);
    printf("Connections: %i                      \nUpload: %iKB/s               \nDownload: %iKB/s              ", connection, upload * 2 / 1024, download * 2 / 1024);
    upload = 0;
    download = 0;
    Sleep(500);
}



return 0;
}


This kind of asynchronous system should be able to run at full datalink speed. Problems I've found wrong are such as:

  • timeout settings causing needless retransmissions
  • in the receiving process, received message A might trigger a database update, such that received message B has to wait, causing an unnecessary delay in the response to message B back to the sender, when the DB update could actually be done in idle time.

There's something called wireshark that can give you some visibility into the message traffic. I used to do it the hard way, with time-stamped message logs.

BTW: I would first use this method on the individual processes, to clean out any bottlenecks, before doing the asynchronous analysis. If you haven't done this, you can bet they're in there. Just any old profiler isn't reliable. There are good ones, including Zoom.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜