Multiple Producer Multiple Consumer lock-free (or even wait-free) queue
I'm searching for documentation on how to write MP/MC queue to be lock-free or even wait-free. I'm using .Net 4.0. Found a lot of C++ code, but I'm not very familiar with me开发者_如何学Cmory models, so there is a big chance I will introduce some bugs while porting to C#.
As an option to consider, there's an algorithm of the bounded Multiple Producer Multiple Consumer queue by Dmitry Vyukov. I've ported the algorithm to .NET, you can find the sources on github. It's very fast.
The enqueue algorithm:
public bool TryEnqueue(object item)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var pos = _enqueuePos; // fetch the current position where to enqueue the item
var index = pos & _bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence wasn't touched by other producers
// and we can increment the enqueue position
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
// write the item we want to enqueue
Volatile.Write(ref buffer[index].Element, item);
// bump the sequence
buffer[index].Sequence = pos + 1;
return true;
}
// If the queue is full we cannot enqueue and just return false
if (cell.Sequence < pos)
{
return false;
}
// repeat the process if other producer managed to enqueue before us
} while (true);
}
The dequeue algorithm:
public bool TryDequeue(out object result)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var bufferMask = _bufferMask; // prefetch the buffer mask
var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
var index = pos & bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence was changed by a producer and wasn't changed by other consumers
// and we can increment the dequeue position
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
// read the item
result = Volatile.Read(ref cell.Element);
// update for the next round of the buffer
buffer[index] = new Cell(pos + bufferMask + 1, null);
return true;
}
// If the queue is empty return false
if (cell.Sequence < pos + 1)
{
result = default(object);
return false;
}
// repeat the process if other consumer managed to dequeue before us
} while (true);
}
Why do you think you need lock-free queue? Have you tried using ConcurrentQueue<T>
, possibly enclosed within a BlockingCollection<T>
?
Writing multi-threaded code is hard. Writing lock-free code is even harder and you shouldn't do it yourself unless you really have to.
My first go would be with ConcurrentQueue<T>
but you can abstract your data store away behind an interface so you can easily change implementations. Then benchmark typical scenarios and see where you run into problems. Remember: Premature optimzation is the root of all evil. Design your system so it's not tied to an implementation but to a contract and then you can optimize your implementations all you want.
I had a look at ConcurrentQueue<T>
with ILSpy and seems to be a lock free implementation at first glance - so good chance it's exactly what your are looking for.
精彩评论