HOW-TO: Client connection manager for Boost::asio?
I created a server using boost:asio. When a client connects it sends a file_size, file_name and the file_data. The server stores this in a file on disk. This works perfectly! Though now I'm running both client application and server application in the main thread of their application (so I've got a server and client app) which blocks the rest of the application(s) from executing.
So in abstract I want to create something like this:
server app
- have one thread to receive and handle all incoming file transfers
- have another thread in which the rest of the application can do the things it want to
client app
- when I press the space bar, or whenever i want, I want to send a file to the server in a separate thread from the main one so my application can continue doing other stuff it needs to do.
My question: how do I create a manager for my client file transfers?
File transfer server accepts new file transfer client connections
#include "ofxFileTransferServer.h"
ofxFileTransferServer::ofxFileTransferServer(unsigned short nPort)
    :acceptor(
        io_service
        ,boost::asio::ip::tcp::endpoint(
            boost::asio::ip::tcp::v4()
            ,nPort
        )
        ,tru开发者_JAVA技巧e
    )
    ,port(nPort)
{
}
// test
void ofxFileTransferServer::startThread() {
    boost::thread t(boost::bind(
        &ofxFileTransferServer::accept
        ,this
    ));
}
void ofxFileTransferServer::accept() {
    ofxFileTransferConnection::pointer new_connection(new ofxFileTransferConnection(io_service));
    acceptor.async_accept(
                    new_connection->socket()
                    ,boost::bind(
                        &ofxFileTransferServer::handleAccept
                        ,this
                        ,new_connection
                        ,boost::asio::placeholders::error
                    )
    );
    std::cout << __FUNCTION__ << " start accepting " << std::endl;
    io_service.run();
}
void ofxFileTransferServer::handleAccept(
            ofxFileTransferConnection::pointer pConnection
            ,const boost::system::error_code& rErr
)
{
    std::cout << __FUNCTION__ << " " << rErr << ", " << rErr.message() << std::endl;
    if(!rErr) {
        pConnection->start();
        ofxFileTransferConnection::pointer new_connection(new ofxFileTransferConnection(io_service));
        acceptor.async_accept(
                        new_connection->socket()
                        ,boost::bind(
                            &ofxFileTransferServer::handleAccept
                            ,this
                            ,new_connection
                            ,boost::asio::placeholders::error
                        )
        );
    }
}
File transfer client
#include "ofxFileTransferClient.h"
#include "ofMain.h"
using boost::asio::ip::tcp;
ofxFileTransferClient::ofxFileTransferClient(
                    boost::asio::io_service &rIOService
                    ,const std::string sServer
                    ,const std::string nPort
                    ,const std::string sFilePath  
):resolver_(rIOService)
,socket_(rIOService)
,file_path_(sFilePath)
,server_(sServer)
,port_(nPort)
{
}
ofxFileTransferClient::~ofxFileTransferClient() {
    std::cout << "~~~~ ofxFileTransferClient" << std::endl;
}
void ofxFileTransferClient::start() {
    // open file / get size
    source_file_stream_.open(
                    ofToDataPath(file_path_).c_str()
                    ,std::ios_base::binary | std::ios_base::ate
    );
    if(!source_file_stream_) {
        std::cout << ">> failed to open:" << file_path_ << std::endl;
        return;
    }
    size_t file_size = source_file_stream_.tellg();
    source_file_stream_.seekg(0);
    // send file size and name to server.
    std::ostream request_stream(&request_);
    request_stream  << file_path_ << "\n"
                    << file_size << "\n\n";
    std::cout   << ">> request_size:"   << request_.size() 
                << " file_path: " << file_path_
                << " file_size: "<< file_size
                << std::endl;
    // resolve ofxFileTransferServer
    tcp::resolver::query query(server_, port_);
    resolver_.async_resolve(
                query
                ,boost::bind(
                        &ofxFileTransferClient::handleResolve
                        ,shared_from_this()
                        ,boost::asio::placeholders::error
                        ,boost::asio::placeholders::iterator
                )
    );
}
void ofxFileTransferClient::handleResolve(
                const boost::system::error_code& rErr
                ,tcp::resolver::iterator oEndPointIt
)
{
    if(!rErr) {
        tcp::endpoint endpoint = *oEndPointIt;
        socket_.async_connect(
                endpoint
                ,boost::bind(
                        &ofxFileTransferClient::handleConnect
                        ,shared_from_this()
                        ,boost::asio::placeholders::error
                        ,++oEndPointIt
                )
        );
    }
    else {
        std::cout << ">> error: " << rErr.message() << std::endl;
    }
}   
void ofxFileTransferClient::handleConnect(
                const boost::system::error_code& rErr
                ,tcp::resolver::iterator oEndPointIt
)
{
    if(!rErr) {
        cout << ">> connected!" << std::endl;
        boost::asio::async_write(
                 socket_
                ,request_
                ,boost::bind(
                        &ofxFileTransferClient::handleFileWrite
                        ,shared_from_this()
                        ,boost::asio::placeholders::error
                )
        );
    }
    else if (oEndPointIt != tcp::resolver::iterator()) {
        // connection failed, try next endpoint in list
        socket_.close();
        tcp::endpoint endpoint = *oEndPointIt;
        socket_.async_connect(
            endpoint
            ,boost::bind(
                &ofxFileTransferClient::handleConnect
                ,shared_from_this()
                ,boost::asio::placeholders::error
                ,++oEndPointIt
            )
        );
    }
    else {
        std::cout << ">> error: " << rErr.message() << std::endl;
    }
}
void ofxFileTransferClient::handleFileWrite(
                const boost::system::error_code& rErr
)
{
    if(!rErr) {
        if(source_file_stream_.eof() == false) {
            source_file_stream_.read(buf_.c_array(), buf_.size());
            if(source_file_stream_.gcount() <= 0) {
                std::cout << ">> read file error." << std::endl;
                return;
            }
            std::cout << ">> send: " << source_file_stream_.gcount() << " bytes, total: " << source_file_stream_.tellg() << " bytes\n";
            boost::asio::async_write(
                    socket_
                    ,boost::asio::buffer(buf_.c_array(), source_file_stream_.gcount())
                    ,boost::bind(
                        &ofxFileTransferClient::handleFileWrite
                        ,this
                        ,boost::asio::placeholders::error
                    )
            );
            if(rErr) {
                std::cout <<">> send error: " << rErr << std::endl; // not sure bout this one..
            }
        }
        else {
            return; // eof()
        }
    }
    else {
        std::cout << ">> error:" << rErr.message() << std::endl;
    }
}
And a tiny manager to manager client transfers (which is used in the client app) Again the threading code is only for testing purposes and isnt used.
#include "ofxFileTransferManager.h"
ofxFileTransferManager::ofxFileTransferManager() { 
}
void ofxFileTransferManager::transferFile(
            const std::string sServer
            ,const std::string nPort
            ,const std::string sFile
)
{
    ofxFileTransferClient::pointer client(new ofxFileTransferClient(
        io_service_
        ,sServer
        ,nPort
        ,sFile
    ));
    client->start();
    io_service_.run();
}
void ofxFileTransferManager::startThread() {
    boost::thread t(boost::bind(
        &ofxFileTransferManager::run
        ,this
    ));
}
void ofxFileTransferManager::run() {
    cout << "starting filemanager" << std::endl;
    while(true) {
        io_service_.run();
        boost::this_thread::sleep(boost::posix_time::milliseconds(250)); 
        cout << ".";
    }
    cout << "ready filemanager" << std::endl;
}
It would be awesome if someone can help me out here. The example of boost all use a "one-time" client connection which doesn't really help me further.
roxlu
Great! I just figured it out. I had to wrap my io_service around a boost::asio::io_service::work object! (and forgot a shared_from_this()) somewhere. I've uploaded my code here: http://github.com/roxlu/ofxFileTransfer
For convenience here is the manager code:
#include "ofxFileTransferManager.h"
ofxFileTransferManager::ofxFileTransferManager()
:work_(io_service_)
{ 
}
void ofxFileTransferManager::transferFile(
            const std::string sServer
            ,const std::string nPort
            ,const std::string sFile
            ,const std::string sRemoteFile
)
{
    ofxFileTransferClient::pointer client(new ofxFileTransferClient(
        io_service_
        ,sServer
        ,nPort
        ,sFile
        ,sRemoteFile
    ));
    client->start();
}
void ofxFileTransferManager::startThread() {
    boost::thread t(boost::bind(
        &ofxFileTransferManager::run
        ,this
    ));
}
void ofxFileTransferManager::run() {
    io_service_.run();
}
From what I can tell, all you really need is to create a new thread and put in its main loop io_service.run();.
Obviously, you would have to take care of protecting classes and variables in mutexes that are shared between the appss main thread and asio's thread.
Edit: Something like this?
static sem_t __semSendFile;
static void* asioThread(void*)
{
    while( true )
    {
        sem_wait( &__semSendFile );
        io_service.run();
    }
    return NULL;
}
void ofxFileTransferManager::transferFile(
            const std::string sServer
            ,const std::string nPort
            ,const std::string sFile
)
{
    ofxFileTransferClient::pointer client(new ofxFileTransferClient(
        io_service_
        ,sServer
        ,nPort
        ,sFile
    ));
    client->start();
    sem_post( &__semSendFile );
}
int main(int argc, char **argv)
{
    if ( sem_init( &__semSendFile, 0, 0 ) != 0 )
    {
        std::cerr << strerror( errno ) << std::endl;
        return -1;
    }
    pthread_t thread;
    if ( pthread_create( &thread, NULL, asioThread, NULL ) != 0 )
    {
        std::cerr << strerror( errno ) << std::endl;
        return -1;
    }
 [...]
 
         加载中,请稍侯......
 加载中,请稍侯......
      
精彩评论