concurrent async_write. is there a wait-free solution?
async_write()
is forbidden to be called concurrently from different threads. It sends data by chunks using async_write_some
and such chunks can be interleaved. So it is up to the user to take care of not calling async_write()
concurrently.
Is there a nicer solution than this pseudocode?
void send(shared_ptr<char> p) {
boost::mutex::scoped_lock lock(m_write_mutex);
async_write(p, handler);
}
I do not like the idea to block other threads for a quite long time (there are ~50Mb sends in my application).
May be something like that would work?
void handler(const boost::system::error_code& e) {
if(!e) {
bool empty = lockfree_pop_front(m_queue);
if(!empty) {
shared_ptr<char> p = lockfree_queue_get_first(m_queue);
async_write(p, handler);
}
}
}
void send(shared_ptr<char>开发者_开发技巧 p) {
bool q_was_empty = lockfree_queue_push_back(m_queue, p)
if(q_was_empty)
async_write(p, handler);
}
I'd prefer to find a ready-to-use cookbook recipe. Dealing with lock-free is not easy, a lot of subtle bugs can appear.
async_write() is forbidden to be called concurrently from different threads
This statement is not quite correct. Applications can freely invoke async_write
concurrently, as long as they are on different socket
objects.
Is there a nicer solution than this pseudocode?
void send(shared_ptr<char> p) { boost::mutex::scoped_lock lock(m_write_mutex); async_write(p, handler); }
This likely isn't accomplishing what you intend since async_write
returns immediately. If you intend the mutex to be locked for the entire duration of the write operation, you will need to keep the scoped_lock
in scope until the completion handler is invoked.
There are nicer solutions for this problem, the library has built-in support using the concept of a strand. It fits this scenario nicely.
A strand is defined as a strictly sequential invocation of event handlers (i.e. no concurrent invocation). Use of strands allows execution of code in a multithreaded program without the need for explicit locking (e.g. using mutexes).
Using an explicit strand here will ensure your handlers are only invoked by a single thread that has invoked io_service::run()
. With your example, the m_queue
member would be protected by a strand, ensuring atomic access to the outgoing message queue. After adding an entry to the queue, if the size is 1, it means no outstanding async_write
operation is in progress and the application can initiate one wrapped through the strand. If the queue size is greater than 1, the application should wait for the async_write
to complete. In the async_write
completion handler, pop off an entry from the queue and handle any errors as necessary. If the queue is not empty, the completion handler should initiate another async_write
from the front of the queue.
This is a much cleaner design that sprinkling mutexes in your classes since it uses the built-in Asio constructs as they are intended. This other answer I wrote has some code implementing this design.
We've solved this problem by having a seperate queue of data to be written held in our socket object. When the first piece of data to be written is "queued", we start an async_write()
. In our async_write
's completion handler, we start subsequent async_write
operations if there is still data to be transmitted.
精彩评论