开发者

Implements timeout using boost::asio::async_read without call run on io_service

I'm trying to read from a input source (in this case stdin) with a timeout. Due to the design of the existing application where this have to fit is it not possible to call run on my io_service.

Here is my try so far:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/optional.hpp>

void set_result( boost::optional<boost::system::error_code> * a, boost::system::error_code b ) {
    if( b == 0)
        a->reset( b );
}

void receive(boost::asio::io_service & io, boost::asio::posix::stream_descriptor & stream, boost::asio::streambuf & result){
    boost::optional<boost::system::error_code> timer_result;
    boost::optional<boost::system::error_code> read_result;

    boost::asio::deadline_timer timer( io );
    timer.expires_from_now( boost::posix_time::milliseconds(5000) );
    timer.async_wait( boost::bind(&set_result, &timer_result, _1) );

    boost::asio::async_read(
            stream,
            result,
            boost::asio::transfer_at_least(1),
            boost::bind( &set_result, &read_result, _1 ));
    boost::system::error_code ec;

    while(1) {
        io.reset();
        io.poll_one(ec);

        if ( read_result ) {
            timer.cancel();
            return;

        } else if ( timer_result )
            throw std::runtime_error("timeout");
    }
}

void receive(boost::asio::io_service & io, boost::asio::posix::stream_descriptor & stream, size_t size, boost::asio::streambuf & result){
    while( result.size() < size )
        receive(io, stream, result);
}

int main(int argc, const char *argv[])
{
    boost::asio::io_service io;
    boost::asio::posix::stream_descriptor in(io, ::dup(STDIN_FILENO));

    for(int i = 0; i < 5; i++){
        std::cout << i << " Type in 4 chareters and press enter" << std::endl << std::endl;
        std::cout << "in> ";
        std::cout.flush();
        try {
            boost::asio::streambuf buf;
            receive(io, in, 5, buf);开发者_C百科
            std::cout << "out> ";
            std::copy(boost::asio::buffer_cast<const char *>(buf.data()), boost::asio::buffer_cast<const char *>(buf.data()) + buf.size(), std::ostream_iterator<char>(std::cout));

        } catch (const std::exception & e) {
            std::cout << e.what() << std::endl;
        }
    }

    return 0;
}

At my first look, I thought that it was working, but after playing more around with the test app, I got some segmentations faults.

I found that something bad happens if I wait for the first query to timeout, and then enter 5 chars the next.

Here is what valgrind says:

==17216== Memcheck, a memory error detector
==17216== Copyright (C) 2002-2010, and GNU GPL'd, by Julian Seward et al.
==17216== Using Valgrind-3.6.1 and LibVEX; rerun with -h for copyright info
==17216== Command: ./ccTalkScan
==17216==
0 Type in 4 chareters and press enter

asdf
0 system:0
1 Type in 4 chareters and press enter

0 system:125
0 system:0
==17216== Invalid read of size 8
==17216==    at 0x546EB4A: ??? (setcontext.S:60)
==17216==    by 0x425245: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, unsigned long, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:321)
==17216==    by 0x42535E: main (ccTalkScan.cxx:333)
==17216==  Address 0x7feffdfa8 is not stack'd, malloc'd or (recently) free'd
==17216==
==17216== Invalid read of size 8
==17216==    at 0x546EB52: ??? (setcontext.S:63)
==17216==    by 0x425245: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, unsigned long, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:321)
==17216==    by 0x42535E: main (ccTalkScan.cxx:333)
==17216==  Address 0x7feffdf98 is not stack'd, malloc'd or (recently) free'd
==17216==
==17216== Invalid read of size 8
==17216==    at 0x546EB59: ??? (setcontext.S:64)
==17216==    by 0x425245: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, unsigned long, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:321)
==17216==    by 0x42535E: main (ccTalkScan.cxx:333)
==17216==  Address 0x7feffdf68 is not stack'd, malloc'd or (recently) free'd
==17216==
==17216== Invalid read of size 8
==17216==    at 0x546EB4A: ??? (setcontext.S:60)
==17216==    by 0x5E4BD2C: (below main) (in /lib64/libc-2.12.2.so)
==17216==  Address 0x7feffe018 is not stack'd, malloc'd or (recently) free'd
==17216==
==17216== Invalid read of size 8
==17216==    at 0x546EB52: ??? (setcontext.S:63)
==17216==    by 0x5E4BD2C: (below main) (in /lib64/libc-2.12.2.so)
==17216==  Address 0x7feffe008 is not stack'd, malloc'd or (recently) free'd
==17216==
==17216== Invalid read of size 8
==17216==    at 0x546EB59: ??? (setcontext.S:64)
==17216==    by 0x5E4BD2C: (below main) (in /lib64/libc-2.12.2.so)
==17216==  Address 0x7feffdfd8 is not stack'd, malloc'd or (recently) free'd
==17216==
timeout
2 Type in 4 chareters and press enter

0 system:0
timeout
3 Type in 4 chareters and press enter

asdf
==17216== Syscall param readv(vector[...]) points to unaddressable byte(s)
==17216==    at 0x5EF7A81: readv (in /lib64/libc-2.12.2.so)
==17216==    by 0x42A77A: boost::asio::detail::descriptor_ops::non_blocking_read(int, iovec*, unsigned long, boost::system::error_code&, unsigned long&) (descriptor_ops.ipp:153)
==17216==    by 0x4355D1: boost::asio::detail::descriptor_read_op_base<boost::asio::mutable_buffers_1>::do_perform(boost::asio::detail::reactor_op*) (descriptor_read_op.hpp:55)
==17216==    by 0x4275DA: boost::asio::detail::reactor_op::perform() (reactor_op.hpp:40)
==17216==    by 0x4288F9: boost::asio::detail::epoll_reactor::run(bool, boost::asio::detail::op_queue<boost::asio::detail::task_io_service_operation>&) (epoll_reactor.ipp:286)
==17216==    by 0x429577: boost::asio::detail::task_io_service::do_one(boost::asio::detail::scoped_lock<boost::asio::detail::posix_mutex>&, boost::asio::detail::task_io_service::idle_thread_info*) (task_io_service.ipp:264)
==17216==    by 0x429165: boost::asio::detail::task_io_service::poll_one(boost::system::error_code&) (task_io_service.ipp:188)
==17216==    by 0x4299B8: boost::asio::io_service::poll_one(boost::system::error_code&) (io_service.ipp:103)
==17216==    by 0x424FDF: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:308)
==17216==    by 0x425245: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, unsigned long, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:321)
==17216==    by 0x42535E: main (ccTalkScan.cxx:333)
==17216==  Address 0x65ba920 is 0 bytes inside a block of size 512 free'd
==17216==    at 0x4C25C4F: operator delete(void*) (vg_replace_malloc.c:387)
==17216==    by 0x432317: __gnu_cxx::new_allocator<char>::deallocate(char*, unsigned long) (new_allocator.h:95)
==17216==    by 0x430D67: std::_Vector_base<char, std::allocator<char> >::_M_deallocate(char*, unsigned long) (stl_vector.h:146)
==17216==    by 0x42F4A0: std::_Vector_base<char, std::allocator<char> >::~_Vector_base() (stl_vector.h:132)
==17216==    by 0x42CF5E: std::vector<char, std::allocator<char> >::~vector() (stl_vector.h:313)
==17216==    by 0x42AEB5: boost::asio::basic_streambuf<std::allocator<char> >::~basic_streambuf() (basic_streambuf.hpp:114)
==17216==    by 0x425374: main (ccTalkScan.cxx:333)
==17216==
0 system:0
4 Type in 4 chareters and press enter

0 system:125
0 system:0
timeout
==17216==
==17216== HEAP SUMMARY:
==17216==     in use at exit: 168 bytes in 3 blocks
==17216==   total heap usage: 63 allocs, 60 frees, 22,070 bytes allocated

I have tried different things to fix it, but I think that I might have misunderstood a thing or two here. So some help would be nice, and an example would be most appreciated ;)


The problem in the above implementation is that the async_reads which times out is never canceled. Here is how to do this:

    while(1) {
        io.reset();
        io.poll_one(ec);

        if ( read_result ) {
            timer.cancel(); // cancel the timeout operation as it has not completed yet
            return;

        } else if ( timer_result ) {
            stream.cancel(); // cancel the read operation as it has not completed yet
            throw std::runtime_error("timeout");
        }
    }
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜