How can I use boost asio sockets for full duplex streaming efficiently?
I'm writing a performance-critical bidirectional streaming server using boost.asio.
The server works this way :- Thread A treats and pushes objects to be sent in the OUTPUT queue
- Thread B waits for objects in the INPUT queue to treat them
- Thread C is an acceptor thread that accepts incoming clients and creates a CLIENT class for each one
Multiple clients run at the same time, each one has his own connected socket and each one must do two things at the same time :
- wait (condition variable) for at least one object to be present in the OUTPUT queue (this may take long) and send it as fast as possible
- get any incoming object from the socket, and put it in the INPUT queue
Moreover, performance and multicore scal开发者_如何学运维ability are critical in this application.
The standard async approach fails here (the send callbacks may block other callbacks while waiting for a new object to send) and the blocking approach (using 1 thread for each direction) is complicated and I can't figure out what to do in case of an error on one of the threads.
Should I use 2 sockets for each client (one for output and one for input) ? Or maybe somehow use two io_services per socket, on two different threads for concurrent callback support ?
Please explain me how you would deal with such a situation. Thank you.
The standard async approach fails here (the send callbacks may block other callbacks while waiting for a new object to send)
The async model should work fine, and if used properly is definitely going to scale better—it only falls apart when you introduce blocking. Instead of ditching async, here's what I would suggest:
Remove the condition variable. You need two queues: OUTPUT and WAITING.
Then when processing a client:
- If there's data in the OUTPUT queue, send it.
- If not, push it on the WAITING queue.
There is no requirement to perform the next I/O from within the handler of a previous one. Here instead of blocking on a condition variable, we just offhand it onto the WAITING queue for later processing.
And in the OUTPUT-pushing code:
- If there are clients in the WAITING queue, send the data directly.
- If not, push onto the OUTPUT queue.
Here's some pseudo-code:
queue<packet> output;
queue<client> waiting;
void try_send(client c)
{
if(!output.empty())
{
// there is output waiting to be sent, send it.
packet p = output.pop();
c.async_send(p, on_send_finished);
}
else
{
// nothing available, go back to waiting.
waiting.push(c);
}
}
void on_send_finished(client c)
{
// send finished, try again if any more output has accumulated:
try_send(c);
}
void push_output(packet p)
{
output.push(p);
if(!waiting.empty())
{
// there is a client waiting to send, give it a try.
client c = waiting.pop();
try_send(c);
}
}
This can all be done in a scalable way using a single thread, but multiple threads are pretty easy with asio. If you're going to use multiple threads, you'll want to introduce a lock in the logic that checks the queues.
精彩评论