开发者

Is this use of a static queue thread-safe?

The msdn documentation states that a static generic Queue is thread-safe. Does this mean that the following code is thread-safe? In other words, is there a problem when a thread Enqueues an int and another thread Dequeues an int at the same time? Do I have to lock the Enqueue and Dequeue operations for thread-safety?

class Test {
    public static Queue<int> queue = new Queue<int>(10000);

    Thread putIntThread;
    Thread takeIntThread;

    public Test() {
        for(int i = 0; i < 5000; ++i) {
            queue.Enqueu开发者_Python百科e(0);
        }
        putIntThread = new Thread(this.PutInt);
        takeIntThread = new Thread(this.TakeInt);
        putIntThread.Start();
        takeIntThread.Start();
    }

    void PutInt() {
        while(true)
        {
            if(queue.Count < 10000) {//no need to lock here as only itself can change this condition
                queue.Enqueue(0);
            }
        }
    }

    void TakeInt() {
        while(true) {
            if(queue.Count > 0) {//no need to lock here as only itself can change this condition
                queue.Dequeue();
            }
        }
    }

}

Edit: I have to use .NET 3.5


This is absolutely not thread-safe. From the docs of Queue<T>.

Public static (Shared in Visual Basic) members of this type are thread safe. Any instance members are not guaranteed to be thread safe.

A Queue<T> can support multiple readers concurrently, as long as the collection is not modified. Even so, enumerating through a collection is intrinsically not a thread-safe procedure. To guarantee thread safety during enumeration, you can lock the collection during the entire enumeration. To allow the collection to be accessed by multiple threads for reading and writing, you must implement your own synchronization.

Rereading your question, you seem to be confused about the phrase "static members of this type" - it's not talking about "a static Queue" as there's no such thing. An object isn't static or not - a member is. When it talks about static members it's talking about things like Encoding.GetEncoding (Queue<T> doesn't actually have any static members). Instance members are things like Enqueue and Dequeue - members which relate to an instance of the type rather than the type itself.

So either you need to use a lock for each action, or if you're using .NET 4, use ConcurrentQueue<T>.


Yes, as said here already an instance member of a static instance is not the same as a static member, and it's only the latter for which thread-safety is guaranteed, so you have to lock on the enqueue and dequeue operations.

If the locking were proving to be a bottleneck, queues are one of the simpler collections to write in a lock-free manner, as long as one doesn't also need the full ICollection<T> implementation provided by Queue<T>:

internal sealed class LockFreeQueue<T>
{
  private sealed class Node
  {
    public readonly T Item;
    public Node Next;
    public Node(T item)
    {
      Item = item;
    }
  }
  private volatile Node _head;
  private volatile Node _tail;
  public LockFreeQueue()
  {
    _head = _tail = new Node(default(T));
  }
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked
  public void Enqueue(T item)
  {
    Node newNode = new Node(item);
    for(;;)
    {
      Node curTail = _tail;
      if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)   //append to the tail if it is indeed the tail.
      {
        Interlocked.CompareExchange(ref _tail, newNode, curTail);   //CAS in case we were assisted by an obstructed thread.
        return;
      }
      else
      {
        Interlocked.CompareExchange(ref _tail, curTail.Next, curTail);  //assist obstructing thread.
      }
    }
  }    
  public bool TryDequeue(out T item)
  {
    for(;;)
    {
      Node curHead = _head;
      Node curTail = _tail;
      Node curHeadNext = curHead.Next;
      if (curHead == curTail)
      {
        if (curHeadNext == null)
        {
          item = default(T);
          return false;
        }
        else
          Interlocked.CompareExchange(ref _tail, curHeadNext, curTail);   // assist obstructing thread
      }
      else
      {
        item = curHeadNext.Item;
        if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
        {
          return true;
        }
      }
    }
  }
#pragma warning restore 420
}

This queue only has an Enqueue and TryDequeue (returns false if the queue was empty) methods. Adding a Count property with use of interlocked increments and decrements is trivial (make sure count field is read volatilely in the actual property), but beyond that it gets pretty tricky to add anything that can't be written as delegating to one of the members already defined, or as happening during construction (in which case you will only have a single thread using it at that point, unless you do something really weird).

The implementation is also wait-free, as if the actions of one thread will not prevent another from making progress (if a thread is half-way through the enqueue procedure when a second thread tries to do so, the second thread will complete the first thread's work).

Still, I'd wait until locking had actually proven a bottleneck (unless you're just experimenting; play with the exotic, work with the familiar). Indeed, in many situations this will prove more expensive than locking on a Queue<T>, particularly since it is less good at keeping the items near each other in memory, so you could find that lots of operations in close succession was less performant for that reason. Locking is normally pretty cheap, as long as there isn't frequent lock-contention.

Edit:

I've time now to add notes on how the above works. I wrote this by reading someone else's version of the same idea, writing this for myself to copy the idea, and then comparing with the version I'd read afterwards, and found it a very informative exercise to do so.

Let's start with a non lock free implementation. It's a singly linked list.

internal sealed class NotLockFreeYetQueue<T>
{
  private sealed class Node
  {
    public readonly T Item;
    public Node Next{get;set;}
    public Node(T item)
    {
      Item = item;
    }
  }
  private Node _head;
  private Node _tail;
  public NotLockFreeYetQueue()
  {
    _head = _tail = new Node(default(T));
  }
  public void Enqueue(T item)
  {
    Node newNode = new Node(item);
    _tail.Next = newNode;
    _tail = newNode;
  }
  public bool TryDequeue(out T item)
  {
      if (_head == _tail)
      {
          item = default(T);
          return false;
      }
      else
      {
        item = _head.Next.Item;
        _head = _head.Next;
        return true;
      }
  }
}

A few notes on the implementation so far.

Item and Next can reasonably be either fields or properties. Since it's a simple inner class and one must be readonly while the other a "dumb" read-write (no logic in the getter or setter) there really isn't much to choose between here. I've made Next a property here purely because that isn't going to work later on, and I want to talk about that when we get there.

Having _head and _tail start as pointing to a sentinel rather than null simplifies things by not having to have a special case for an empty queue.

So, enqueuing will create a new node and set it as _tail's Next property before becoming the new tail. Dequeuing will check for emptiness and if it isn't empty, obtain the value from the head node and set head to be the node that was the old head's Next property.

Another thing to notice at this point, is that since new nodes are created as needed, rather than in a pre-allocated array, it will have less good performance in normal use than Queue<T>. This isn't going to get any better, and indeed everything we're going to do now will make single-thread performance worse. Again, it's only in heavy contention that this will beat a locked Queue<T>.

Let's make enqueue lock-free. We'll use Interlocked.CompareExchange(). This compares the first parameter with the third parameter, and sets the first parameter to be the second parameter if they are equal. In any case it returns the old value (whether it was over-written or not). The compare and exchange is done as an atomic operation, so is in itself threadsafe, but we need a bit more work to make combinations of such operations also threadsafe.

CompareExchange and equivalents in other languages are sometimes abbreviated to CAS (for Compare-And-Swap).

A common way to use them are in loops, where we first obtain the value we will over-write through a normal read (remember that .NET reads of 32bit values, smaller values, and reference types are always atomic) and try to overwrite it if it hasn't changed, looping until we succeed:

private sealed class Node
{
  public readonly T Item;
  public Node Next;
  public Node(T item)
  {
    Item = item;
  }
}
/* ... */
private volatile Node _tail;
/* ... */
public void Enqueue(T item)
{
  Node newNode = new Node(item);
  for(;;)
  {
    Node curTail = _tail;
    if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)
    {
      _tail = newNode;
      return;
    }
  }
}

We want to add to the tail's Next only if it's null - if not another thread as written to it. So, we do a CAS that will only succeed if this is the case. If it is, we set _tail to be that new node, otherwise we try again.

Next had to be changed to be a field for this to work, we can't do it with properties. We also make _tail volatile so that _tail will be fresh in all CPU caches (CompareExchange has volatile semantics so it won't be broken by the lack of volatility, but it may spin more often than necessary, and we'll be doing more with _tail also).

This is lock-free, but not wait-free. If a thread got as far as the CAS, but had not yet written to _tail, and then didn't have any CPU time for a while, all other threads trying to enqueue would keep looping until it was scheduled and managed to do so. If the thread was aborted or suspended for a long time, this would cause a sort of permanent livelock.

So, if we are in the condition where the CAS has failed, we are in such a situation. We can fix this by doing the other thread's work for it:

  for(;;)
  {
    Node curTail = _tail;
    if(Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)
    {
      Interlocked.CompareExchange(ref _tail, newNode, curTail);   //CAS in case we were assisted by an obstructed thread.

      return;
    }
    else
    {
      Interlocked.CompareExchange(ref _tail, curTail.Next, curTail);  //assist obstructing thread.
    }
  }

Now, in most cases the thread that wrote to curTail.Next will assign the new node to _tail - but through a CAS in case it's already been done. However, another thread fails to write to curtail.Next it can try to assign curTail.Next to _tail to do the first thread's work and get on to it's own.

So, a lock-free, wait-free enqueue. Time to work on dequeuing. First let's consider the case where we don't suspect the queue of being empty. Just as with enqueuing, we will first get local copies of the nodes we are interested in; _head, _tail, and _head.Next (again not using a null head or tail for empty queues makes life easier; it means it is safe to read _head.Next in any state). Also like with enqueuing, we will depend upon volatility, this time not just of _tail, but of _head, so we change it to:

private volatile Node _head;

And we change TryDequeue to:

  public bool TryDequeue(out T item)
  {
      Node curHead = _head;
      Node curTail = _tail;
      Node curHeadNext = curHead.Next;
      if (_head == _tail)
      {
          item = default(T);
          return false;
      }
      else
      {
        item = curHeadNext.Item;
        if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
          return true;
      }
  }

The empty-queue case is now incorrect, but we'll come back to that. It's safe to set item to curHeadNext.Item as if we don't complete the operation we will overwrite it again, but we must make the operation writing to _head atomic and guaranteed to happen only if _head hasn't changed. If it hasn't, then _head has been updated by another thread and we can loop again (no need to work for that thread, it's already done everything that'll effect us).

Now consider what happens if _head == _tail. Possibly it is empty, but possibly _tail.Next (which will be the same as curHeadNext) was written to by an enqueue. In such a case what we more likely want is not a result of an empty quque, but a result of our dequeuing that partly-enqueued item. So, we assist that thread and continue the loop again:

if (curHead == curTail)
{
    if (curHeadNext == null)
    {
        item = default(T);
        return false;
    }
    else
        Interlocked.CompareExchange(ref _tail, curHeadNext, curTail);
}

Finally, the only issue left is that we keep getting 420 warnings because we are passing volatile fields to byref methods. This often stops volatile semantics (hence the warning) but doesn't with CompareExchange (hence our doing so). We can disable the warning, includin a comment to explain why we did so (I try never to disable a warning without a justifying comment) and we've the code I gave earlier.

Note that it's important for this that we are doing this in a GC-supporting framework. If we had to handle deallocation as well, it would get much more complicated.


What MSDN states is that static methods of Queue are thread-safe, not that instance methods of a static instance are thread-safe.


Yes, you have to lock just as MSDN says

To allow the collection to be accessed by multiple threads for reading and writing, you must implement your own synchronization.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜