开发者

Multiple Producer Multiple Consumer lock-free (or even wait-free) queue

I'm searching for documentation on how to write MP/MC queue to be lock-free or even wait-free. I'm using .Net 4.0. Found a lot of C++ code, but I'm not very familiar with me开发者_如何学Cmory models, so there is a big chance I will introduce some bugs while porting to C#.


As an option to consider, there's an algorithm of the bounded Multiple Producer Multiple Consumer queue by Dmitry Vyukov. I've ported the algorithm to .NET, you can find the sources on github. It's very fast.

The enqueue algorithm:

public bool TryEnqueue(object item)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var pos = _enqueuePos; // fetch the current position where to enqueue the item
        var index = pos & _bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence wasn't touched by other producers
        // and we can increment the enqueue position
        if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
        {
            // write the item we want to enqueue
            Volatile.Write(ref buffer[index].Element, item);
            // bump the sequence
            buffer[index].Sequence = pos + 1;
            return true;
        }

        // If the queue is full we cannot enqueue and just return false
        if (cell.Sequence < pos)
        {
            return false;
        }

        // repeat the process if other producer managed to enqueue before us
    } while (true);
}

The dequeue algorithm:

public bool TryDequeue(out object result)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var bufferMask = _bufferMask; // prefetch the buffer mask
        var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
        var index = pos & bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence was changed by a producer and wasn't changed by other consumers
        // and we can increment the dequeue position
        if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
        {
            // read the item
            result = Volatile.Read(ref cell.Element);
            // update for the next round of the buffer
            buffer[index] = new Cell(pos + bufferMask + 1, null);
            return true;
        }

        // If the queue is empty return false
        if (cell.Sequence < pos + 1)
        {
            result = default(object);
            return false;
        }

        // repeat the process if other consumer managed to dequeue before us
    } while (true);
}


Why do you think you need lock-free queue? Have you tried using ConcurrentQueue<T>, possibly enclosed within a BlockingCollection<T>?

Writing multi-threaded code is hard. Writing lock-free code is even harder and you shouldn't do it yourself unless you really have to.


My first go would be with ConcurrentQueue<T> but you can abstract your data store away behind an interface so you can easily change implementations. Then benchmark typical scenarios and see where you run into problems. Remember: Premature optimzation is the root of all evil. Design your system so it's not tied to an implementation but to a contract and then you can optimize your implementations all you want.

I had a look at ConcurrentQueue<T> with ILSpy and seems to be a lock free implementation at first glance - so good chance it's exactly what your are looking for.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜