HOW-TO: Client connection manager for Boost::asio?
I created a server using boost:asio. When a client connects it sends a file_size, file_name and the file_data. The server stores this in a file on disk. This works perfectly! Though now I'm running both client application and server application in the main thread of their application (so I've got a server and client app) which blocks the rest of the application(s) from executing.
So in abstract I want to create something like this:
server app
- have one thread to receive and handle all incoming file transfers
- have another thread in which the rest of the application can do the things it want to
client app
- when I press the space bar, or whenever i want, I want to send a file to the server in a separate thread from the main one so my application can continue doing other stuff it needs to do.
My question: how do I create a manager for my client file transfers?
File transfer server accepts new file transfer client connections
#include "ofxFileTransferServer.h"
ofxFileTransferServer::ofxFileTransferServer(unsigned short nPort)
:acceptor(
io_service
,boost::asio::ip::tcp::endpoint(
boost::asio::ip::tcp::v4()
,nPort
)
,tru开发者_JAVA技巧e
)
,port(nPort)
{
}
// test
void ofxFileTransferServer::startThread() {
boost::thread t(boost::bind(
&ofxFileTransferServer::accept
,this
));
}
void ofxFileTransferServer::accept() {
ofxFileTransferConnection::pointer new_connection(new ofxFileTransferConnection(io_service));
acceptor.async_accept(
new_connection->socket()
,boost::bind(
&ofxFileTransferServer::handleAccept
,this
,new_connection
,boost::asio::placeholders::error
)
);
std::cout << __FUNCTION__ << " start accepting " << std::endl;
io_service.run();
}
void ofxFileTransferServer::handleAccept(
ofxFileTransferConnection::pointer pConnection
,const boost::system::error_code& rErr
)
{
std::cout << __FUNCTION__ << " " << rErr << ", " << rErr.message() << std::endl;
if(!rErr) {
pConnection->start();
ofxFileTransferConnection::pointer new_connection(new ofxFileTransferConnection(io_service));
acceptor.async_accept(
new_connection->socket()
,boost::bind(
&ofxFileTransferServer::handleAccept
,this
,new_connection
,boost::asio::placeholders::error
)
);
}
}
File transfer client
#include "ofxFileTransferClient.h"
#include "ofMain.h"
using boost::asio::ip::tcp;
ofxFileTransferClient::ofxFileTransferClient(
boost::asio::io_service &rIOService
,const std::string sServer
,const std::string nPort
,const std::string sFilePath
):resolver_(rIOService)
,socket_(rIOService)
,file_path_(sFilePath)
,server_(sServer)
,port_(nPort)
{
}
ofxFileTransferClient::~ofxFileTransferClient() {
std::cout << "~~~~ ofxFileTransferClient" << std::endl;
}
void ofxFileTransferClient::start() {
// open file / get size
source_file_stream_.open(
ofToDataPath(file_path_).c_str()
,std::ios_base::binary | std::ios_base::ate
);
if(!source_file_stream_) {
std::cout << ">> failed to open:" << file_path_ << std::endl;
return;
}
size_t file_size = source_file_stream_.tellg();
source_file_stream_.seekg(0);
// send file size and name to server.
std::ostream request_stream(&request_);
request_stream << file_path_ << "\n"
<< file_size << "\n\n";
std::cout << ">> request_size:" << request_.size()
<< " file_path: " << file_path_
<< " file_size: "<< file_size
<< std::endl;
// resolve ofxFileTransferServer
tcp::resolver::query query(server_, port_);
resolver_.async_resolve(
query
,boost::bind(
&ofxFileTransferClient::handleResolve
,shared_from_this()
,boost::asio::placeholders::error
,boost::asio::placeholders::iterator
)
);
}
void ofxFileTransferClient::handleResolve(
const boost::system::error_code& rErr
,tcp::resolver::iterator oEndPointIt
)
{
if(!rErr) {
tcp::endpoint endpoint = *oEndPointIt;
socket_.async_connect(
endpoint
,boost::bind(
&ofxFileTransferClient::handleConnect
,shared_from_this()
,boost::asio::placeholders::error
,++oEndPointIt
)
);
}
else {
std::cout << ">> error: " << rErr.message() << std::endl;
}
}
void ofxFileTransferClient::handleConnect(
const boost::system::error_code& rErr
,tcp::resolver::iterator oEndPointIt
)
{
if(!rErr) {
cout << ">> connected!" << std::endl;
boost::asio::async_write(
socket_
,request_
,boost::bind(
&ofxFileTransferClient::handleFileWrite
,shared_from_this()
,boost::asio::placeholders::error
)
);
}
else if (oEndPointIt != tcp::resolver::iterator()) {
// connection failed, try next endpoint in list
socket_.close();
tcp::endpoint endpoint = *oEndPointIt;
socket_.async_connect(
endpoint
,boost::bind(
&ofxFileTransferClient::handleConnect
,shared_from_this()
,boost::asio::placeholders::error
,++oEndPointIt
)
);
}
else {
std::cout << ">> error: " << rErr.message() << std::endl;
}
}
void ofxFileTransferClient::handleFileWrite(
const boost::system::error_code& rErr
)
{
if(!rErr) {
if(source_file_stream_.eof() == false) {
source_file_stream_.read(buf_.c_array(), buf_.size());
if(source_file_stream_.gcount() <= 0) {
std::cout << ">> read file error." << std::endl;
return;
}
std::cout << ">> send: " << source_file_stream_.gcount() << " bytes, total: " << source_file_stream_.tellg() << " bytes\n";
boost::asio::async_write(
socket_
,boost::asio::buffer(buf_.c_array(), source_file_stream_.gcount())
,boost::bind(
&ofxFileTransferClient::handleFileWrite
,this
,boost::asio::placeholders::error
)
);
if(rErr) {
std::cout <<">> send error: " << rErr << std::endl; // not sure bout this one..
}
}
else {
return; // eof()
}
}
else {
std::cout << ">> error:" << rErr.message() << std::endl;
}
}
And a tiny manager to manager client transfers (which is used in the client app) Again the threading code is only for testing purposes and isnt used.
#include "ofxFileTransferManager.h"
ofxFileTransferManager::ofxFileTransferManager() {
}
void ofxFileTransferManager::transferFile(
const std::string sServer
,const std::string nPort
,const std::string sFile
)
{
ofxFileTransferClient::pointer client(new ofxFileTransferClient(
io_service_
,sServer
,nPort
,sFile
));
client->start();
io_service_.run();
}
void ofxFileTransferManager::startThread() {
boost::thread t(boost::bind(
&ofxFileTransferManager::run
,this
));
}
void ofxFileTransferManager::run() {
cout << "starting filemanager" << std::endl;
while(true) {
io_service_.run();
boost::this_thread::sleep(boost::posix_time::milliseconds(250));
cout << ".";
}
cout << "ready filemanager" << std::endl;
}
It would be awesome if someone can help me out here. The example of boost all use a "one-time" client connection which doesn't really help me further.
roxlu
Great! I just figured it out. I had to wrap my io_service around a boost::asio::io_service::work object! (and forgot a shared_from_this()) somewhere. I've uploaded my code here: http://github.com/roxlu/ofxFileTransfer
For convenience here is the manager code:
#include "ofxFileTransferManager.h"
ofxFileTransferManager::ofxFileTransferManager()
:work_(io_service_)
{
}
void ofxFileTransferManager::transferFile(
const std::string sServer
,const std::string nPort
,const std::string sFile
,const std::string sRemoteFile
)
{
ofxFileTransferClient::pointer client(new ofxFileTransferClient(
io_service_
,sServer
,nPort
,sFile
,sRemoteFile
));
client->start();
}
void ofxFileTransferManager::startThread() {
boost::thread t(boost::bind(
&ofxFileTransferManager::run
,this
));
}
void ofxFileTransferManager::run() {
io_service_.run();
}
From what I can tell, all you really need is to create a new thread and put in its main loop io_service.run();
.
Obviously, you would have to take care of protecting classes and variables in mutexes that are shared between the appss main thread and asio's thread.
Edit: Something like this?
static sem_t __semSendFile;
static void* asioThread(void*)
{
while( true )
{
sem_wait( &__semSendFile );
io_service.run();
}
return NULL;
}
void ofxFileTransferManager::transferFile(
const std::string sServer
,const std::string nPort
,const std::string sFile
)
{
ofxFileTransferClient::pointer client(new ofxFileTransferClient(
io_service_
,sServer
,nPort
,sFile
));
client->start();
sem_post( &__semSendFile );
}
int main(int argc, char **argv)
{
if ( sem_init( &__semSendFile, 0, 0 ) != 0 )
{
std::cerr << strerror( errno ) << std::endl;
return -1;
}
pthread_t thread;
if ( pthread_create( &thread, NULL, asioThread, NULL ) != 0 )
{
std::cerr << strerror( errno ) << std::endl;
return -1;
}
[...]
精彩评论