Network client simulator design
I am trying to design a software in c++ that will send request bytes (following a standard **application level** protocol whose fields to be populated will be taken from a text file) using **UDP protocol**.
Now this client must be able to send these requests at very high rate..upto **2000 transactions per second** and should also receive the response if it gets within a specified timeout else don't receive it
I will be using boost library for all the socket things but I am not sure about the design of it for such high speed application :(
I think I have to use a highly multi-threaded application (again Boost will be used). Am I right ? Do I have to create a seperate thread for each request ? But I think only one thread must be waiting to r开发者_如何学运维ecieve the response else if many threads are waiting for a response how can we distinguish for which threads request we have got the response for !!
Hope that question is clear. I just need some help regarding the design points and suspected problems that I may face.
I'm halfway through a network client of my own at the moment, so perhaps I can impart some advice and some resources to look at. There are many more experienced in this area, and hopefully they'll chime in :)
Firstly, you're about boost. Once you get used to how it all hangs together, boost::asio
is a great toolkit for writing network-code. Essentially, you create an io_service
and call run
to execute until all work is complete, or runOne
to perform a single IO action. On their own, this isn't that helpful. The power comes from when you either run runOne
in it's own loop:
boost::asio::io_service myIOService;
while(true)
{
myIOService.runOne();
}
, or run the run
function on one (or more) threads:
boost::thread t(boost::bind(&boost::asio::io_service::run, &myIOService));
However, it is worth noting that run
returns as soon as there is no work to do (so you can say goodbye to that thread). As I found out here on Stackoverflow, the trick is to ensure it always has something to do. The solution is in boost::asio::io_service::work
:
boost::asio::io_service::work myWork(myIOService); // just some abstract "work"
The above line ensures your thread won't stop when nothing is going on. I view is as a means to keep-alive :)
At some point, you're going to want to create a socket and connect it somewhere. I created a generic Socket class (and derived a text-socket from that to create buffered input). I also wanted an event-based system that worked very much like C#. I've outlined this stuff for you, below:
First step, we need a generic way of passing arguments around, hence, EventArgs
:
eventArgs.h
class EventArgs : boost::noncopyable
{
private:
public:
EventArgs();
virtual ~EventArgs() = 0;
}; // eo class EventArgs:
Now, we need an event class which people can subscribe/unsubscribe to:
event.h
// STL
#include <functional>
#include <stack>
// Boost
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
// class Event
class Event : boost::noncopyable
{
public:
typedef std::function<void(const EventArgs&)> DelegateType;
typedef boost::shared_ptr<DelegateType> DelegateDecl;
private:
boost::mutex m_Mutex;
typedef std::set<DelegateDecl> DelegateSet;
typedef std::stack<DelegateDecl> DelegateStack;
typedef DelegateSet::const_iterator DelegateSet_cit;
DelegateSet m_Delegates;
DelegateStack m_ToRemove;
public:
Event()
{
}; // eo ctor
Event(Event&& _rhs) : m_Delegates(std::move(_rhs.m_Delegates))
{
}; // eo mtor
~Event()
{
}; // eo dtor
// static methods
static DelegateDecl bindDelegate(DelegateType _f)
{
DelegateDecl ret(new DelegateType(_f));
return ret;
}; // eo bindDelegate
// methods
void raise(const EventArgs& _args)
{
boost::mutex::scoped_lock lock(m_Mutex);
// get rid of any we have to remove
while(m_ToRemove.size())
{
m_Delegates.erase(m_Delegates.find(m_ToRemove.top()));
m_ToRemove.pop();
};
if(m_Delegates.size())
std::for_each(m_Delegates.begin(),
m_Delegates.end(),
[&_args](const DelegateDecl& _decl) { (*_decl)(_args); });
}; // eo raise
DelegateDecl addListener(DelegateDecl _decl)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Delegates.insert(_decl);
return _decl;
}; // eo addListener
DelegateDecl addListener(DelegateType _f)
{
DelegateDecl ret(bindDelegate(_f));
return addListener(ret);
}; // eo addListener
void removeListener(const DelegateDecl _decl)
{
boost::mutex::scoped_lock lock(m_Mutex);
DelegateSet_cit cit(m_Delegates.find(_decl));
if(cit != m_Delegates.end())
m_ToRemove.push(_decl);
}; // eo removeListener
// operators
// Only use operator += if you don't which to manually detach using removeListener
Event& operator += (DelegateType _f)
{
addListener(_f);
return *this;
}; // eo op +=
}; // eo class Event
Then, it was time to create a socket class. Below is the header:
socket.h
(Some notes: ByteVector
is typedef std::vector<unsigned char>
)
#pragma once
#include "event.h"
// boost
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/buffer.hpp>
// class Socket
class MORSE_API Socket : boost::noncopyable
{
protected:
typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
private:
ByteVector m_Buffer; // will be used to read in
SocketPtr m_SocketPtr;
boost::asio::ip::tcp::endpoint m_RemoteEndPoint;
bool m_bConnected;
// reader
void _handleConnect(const boost::system::error_code& _errorCode, boost::asio::ip::tcp::resolver_iterator _rit);
void _handleRead(const boost::system::error_code& _errorCode, std::size_t read);
protected:
SocketPtr socket() { return m_SocketPtr; };
public:
Socket(ByteVector_sz _bufSize = 512);
virtual ~Socket();
// properties
bool isConnected() const { return m_bConnected; };
const boost::asio::ip::tcp::endpoint& remoteEndPoint() const {return m_RemoteEndPoint; };
// methods
void connect(boost::asio::ip::tcp::resolver_iterator _rit);
void connect(const String& _host, const Port _port);
void close();
// Events
Event onRead;
Event onResolve;
Event onConnect;
Event onClose;
}; // eo class Socket
And, now the implementation. You'll notice it calls another class to perform DNS resolution. I will show that afterwards. Also there are some EventArg
-derivatives I have ommitted. They are simply passed as EventArg parameters when socket events occur.
socket.cpp
#include "socket.h"
// boost
#include <boost/asio/placeholders.hpp>
namespace morse
{
namespace net
{
// ctor
Socket::Socket(ByteVector_sz _bufSize /* = 512 */) : m_bConnected(false)
{
m_Buffer.resize(_bufSize);
}; // eo ctor
// dtor
Socket::~Socket()
{
}; // eo dtor
// _handleRead
void Socket::_handleRead(const boost::system::error_code& _errorCode,
std::size_t _read)
{
if(!_errorCode)
{
if(_read)
{
onRead.raise(SocketReadEventArgs(*this, m_Buffer, _read));
// read again
m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
};
}
else
close();
}; // eo _handleRead
// _handleConnect
void Socket::_handleConnect(const boost::system::error_code& _errorCode,
boost::asio::ip::tcp::resolver_iterator _rit)
{
m_bConnected = !_errorCode;
bool _raise(false);
if(!_errorCode)
{
m_RemoteEndPoint = *_rit;
_raise = true;
m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
}
else if(++_rit != boost::asio::ip::tcp::resolver::iterator())
{
m_SocketPtr->close();
m_SocketPtr->async_connect(*_rit, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
}
else
_raise = true; // raise complete failure
if(_raise)
onConnect.raise(SocketConnectEventArgs(*this, _errorCode));
}; // eo _handleConnect
// connect
void Socket::connect(boost::asio::ip::tcp::resolver_iterator _rit)
{
boost::asio::ip::tcp::endpoint ep(*_rit);
m_SocketPtr.reset(new boost::asio::ip::tcp::socket(Root::instance().ioService()));
m_SocketPtr->async_connect(ep, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
};
void Socket::connect(const String& _host, Port _port)
{
// Anon function for resolution of the host-name and asynchronous calling of the above
auto anonResolve = [this](const boost::system::error_code& _errorCode,
boost::asio::ip::tcp::resolver_iterator _epIt)
{
// raise event
onResolve.raise(SocketResolveEventArgs(*this, !_errorCode ? (*_epIt).host_name() : String(""), _errorCode));
// perform connect, calling back to anonymous function
if(!_errorCode)
this->connect(_epIt);
};
// Resolve the host calling back to anonymous function
Root::instance().resolveHost(_host, _port, anonResolve);
}; // eo connect
void Socket::close()
{
if(m_bConnected)
{
onClose.raise(SocketCloseEventArgs(*this));
m_SocketPtr->close();
m_bConnected = false;
};
} // eo close
As I said about DNS resolution, the line Root::instance().resolveHost(_host, _port, anonResolve);
calls this to perform asynchronous DNS:
// resolve a host asynchronously
template<typename ResolveHandler>
void resolveHost(const String& _host, Port _port, ResolveHandler _handler)
{
boost::asio::ip::tcp::endpoint ret;
boost::asio::ip::tcp::resolver::query query(_host, boost::lexical_cast<std::string>(_port));
m_Resolver.async_resolve(query, _handler);
}; // eo resolveHost
Finally, I need a text-based socket that raised an event every time I line was received (which is then processed). I'll omit the header file this time and just show the implementation file. Needless to say it declares an Event
called onLine
which it fires every time a line is received in it's entirety:
// boost
#include <boost/asio/buffer.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/placeholders.hpp>
namespace morse
{
namespace net
{
String TextSocket::m_DefaultEOL("\r\n");
// ctor
TextSocket::TextSocket() : m_EOL(m_DefaultEOL)
{
onRead += boost::bind(&TextSocket::readHandler, this, _1);
}; // eo ctor
// dtor
TextSocket::~TextSocket()
{
}; // eo dtor
// readHandler
void TextSocket::readHandler(const EventArgs& _args)
{
auto& args(static_cast<const SocketReadEventArgs&>(_args));
m_LineBuffer.append(args.buffer().begin(), args.buffer().begin() + args.bytesRead());
String::size_type pos;
while((pos = m_LineBuffer.find(eol())) != String::npos)
{
onLine.raise(SocketLineEventArgs(*this, m_LineBuffer.substr(0, pos)));
m_LineBuffer = m_LineBuffer.substr(pos + eol().length());
};
}; // eo readHandler
// writeHandler
void TextSocket::writeHandler(const boost::system::error_code& _errorCode, std::size_t _written)
{
if(!_errorCode)
{
m_Queue.pop_front();
if(!m_Queue.empty()) // more to do?
boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
}
else
close();
}; // eo writeHandler
void TextSocket::sendLine(String _line)
{
Root::instance().ioService().post(boost::bind(&TextSocket::_sendLine, this, _line));
}; // eo sendLine
// _sendLine
void TextSocket::_sendLine(String _line)
{
// copy'n'queue
_line.append(m_EOL);
m_Queue.push_back(_line);
if(m_Queue.size() == 1) // previously an empty queue, must start write!
boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
}; // eo sendLine
Some things to note about the class above... it uses boost::asio::post
to send lines. This allows it to all occur on the threads that ASIO manages in a thread-safe way, and allows us to queue up lines to be sent as and when. This makes it very scalable.
I am sure there are many more questions and maybe my code isn't helpful. I spent several days piecing it all together and making sense of it, and I doubt it's actually any good. hopefully some better minds will glance over it and go "HOLY CRAP, THIS
I'm not sure you need to go "heavy" multi-thread. Most high speed applications use the polling mechanisms of the operating system, which generally scale better than threads.
The architecture will depend a lot on how reactive your application needs to be, in terms of what components are responsible for generating inputs and outputs, and doing the actual processing.
A way to approach your problem using boost::asio would be to have a communication thread that runs the boost::asio::io_service::run method. The io_service listens on the various UDP sockets, and processes messages as they arrive, possibly sending them down a queue so that the application can process them in the main thread. From the main thread, you can post messages to the io_services for them to be sent by the main system.
This should allow you to climb up to 2000 messages per seconds without much difficulties.
An alternative would be to start several communication threads by calling the boost::asio::io_service::run method several times from several threads, allowing for messages to be processed in parallel by their communication thread.
One word of advice with Asio, though: because of its asynchronous architecture, it works better if you go within its logic and use it the way it is meant to. If you find out that you are using a lot of locks and managing many threads yourself, then you are probably doing it wrong. Look closely at the thread safety guarantees of the various methods, and study the provided examples.
精彩评论