开发者

concurrent async_write. is there a wait-free solution?

async_write() is forbidden to be called concurrently from different threads. It sends data by chunks using async_write_some and such chunks can be interleaved. So it is up to the user to take care of not calling async_write() concurrently.

Is there a nicer solution than this pseudocode?

void send(shared_ptr<char> p) {
  boost::mutex::scoped_lock lock(m_write_mutex);
  async_write(p, handler);
}

I do not like the idea to block other threads for a quite long time (there are ~50Mb sends in my application).

May be something like that would work?

void handler(const boost::system::error_code& e) {
  if(!e) {
    bool empty = lockfree_pop_front(m_queue);
    if(!empty) {
      shared_ptr<char> p = lockfree_queue_get_first(m_queue);
      async_write(p, handler);
    }
  }
}

void send(shared_ptr<char>开发者_开发技巧 p) {
  bool q_was_empty = lockfree_queue_push_back(m_queue, p)
  if(q_was_empty)
    async_write(p, handler);
}

I'd prefer to find a ready-to-use cookbook recipe. Dealing with lock-free is not easy, a lot of subtle bugs can appear.


async_write() is forbidden to be called concurrently from different threads

This statement is not quite correct. Applications can freely invoke async_write concurrently, as long as they are on different socket objects.

Is there a nicer solution than this pseudocode?

void send(shared_ptr<char> p) {
  boost::mutex::scoped_lock lock(m_write_mutex);
  async_write(p, handler);
}

This likely isn't accomplishing what you intend since async_write returns immediately. If you intend the mutex to be locked for the entire duration of the write operation, you will need to keep the scoped_lock in scope until the completion handler is invoked.

There are nicer solutions for this problem, the library has built-in support using the concept of a strand. It fits this scenario nicely.

A strand is defined as a strictly sequential invocation of event handlers (i.e. no concurrent invocation). Use of strands allows execution of code in a multithreaded program without the need for explicit locking (e.g. using mutexes).

Using an explicit strand here will ensure your handlers are only invoked by a single thread that has invoked io_service::run(). With your example, the m_queue member would be protected by a strand, ensuring atomic access to the outgoing message queue. After adding an entry to the queue, if the size is 1, it means no outstanding async_write operation is in progress and the application can initiate one wrapped through the strand. If the queue size is greater than 1, the application should wait for the async_write to complete. In the async_write completion handler, pop off an entry from the queue and handle any errors as necessary. If the queue is not empty, the completion handler should initiate another async_write from the front of the queue.

This is a much cleaner design that sprinkling mutexes in your classes since it uses the built-in Asio constructs as they are intended. This other answer I wrote has some code implementing this design.


We've solved this problem by having a seperate queue of data to be written held in our socket object. When the first piece of data to be written is "queued", we start an async_write(). In our async_write's completion handler, we start subsequent async_write operations if there is still data to be transmitted.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜