开发者

Enabling Queue<T> with concurrency

I have a previous question which I have provided my solution; however, I don't have access to ConcurrentQueue<T> since I am on .Net 3.5. I need Queue<T> to allow concurrency. I read this question and seems to present a problem if an item is not in the queue and the threaded method tries to dequeue an item.

My task now is to determine whether I can derive my own concurrent Queue class. This is what I came up with:

public sealed class ConcurrentQueue : Queue<DataTable>
{
    public event EventHandler<TableQueuedEventArgs> TableQueued;
    private ICollection que;

    new public void Enqueue(DataTable Table)
    {
        lock开发者_运维问答 (que.SyncRoot)
        {
            base.Enqueue(Table);
        }

        OnTableQueued(new TableQueuedEventArgs(Dequeue()));
    }

    //  this is where I think I will have a problem...
    new public DataTable Dequeue()
    {
        DataTable table;

        lock (que.SyncRoot)
        {
            table = base.Dequeue();
        }

        return table;
    }

    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;

        if (handler != null)
        {
            handler(this, table);
        }
    }
}

So, when a DataTable is queued, the EventArgs will pass a dequeued table to the event subscriber. Will this implementation provide me with a thread-safe Queue?


A quick trip to my favorite search engine revealed that my memory was correct; you can get the Task Parallel Library even on .NET 3.5. Also see The PFX team blog post on the subject, and the Reactive Extensions that you download in order to get at the desired System.Threading.dll.


The fact you need to use new to hide methods from the base class is usually an indication that you should use composition rather than inheritance...

Here's a simple synchronized queue, which doesn't use inheritance but still relies on the behavior of the standard Queue<T>:

public class ConcurrentQueue<T> : ICollection, IEnumerable<T>
{
    private readonly Queue<T> _queue;

    public ConcurrentQueue()
    {
        _queue = new Queue<T>();
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (SyncRoot)
        {
            foreach (var item in _queue)
            {
                yield return item;
            }
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        lock (SyncRoot)
        {
            ((ICollection)_queue).CopyTo(array, index);
        }
    }

    public int Count
    {
        get
        { 
            // Assumed to be atomic, so locking is unnecessary
            return _queue.Count;
        }
    }

    public object SyncRoot
    {
        get { return ((ICollection)_queue).SyncRoot; }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public void Enqueue(T item)
    {
        lock (SyncRoot)
        {
            _queue.Enqueue(item);
        }
    }

    public T Dequeue()
    {
        lock(SyncRoot)
        {
            return _queue.Dequeue();
        }
    }

    public T Peek()
    {
        lock (SyncRoot)
        {
            return _queue.Peek();
        }
    }

    public void Clear()
    {
        lock (SyncRoot)
        {
            _queue.Clear();
        }
    }
}


You're dequeueing your items as you enqueue them.
You need to raise the event using your parameter.

Whether it's actually thread-safe depends on how you use it.
If you ever check the Count or check for emptiness, it's not threadsafe and cannot easily be made threadsafe.
If you don't, you can probably use something simpler than a queue.


Some time after the initial question, I know (this came up as "related" to the right of another question), but I've gone with the following in similar cases. Not as good for CPU-cache-use as it could be, but simple, lock-free, thread-safe, and often CPU-cache-use isn't that important if there'd often be large gaps between operations, and when not the closeness of allocation might reduce the impact:

internal sealed class LockFreeQueue<T>
{
  private sealed class Node
  {
    public readonly T Item;
    public Node Next;
    public Node(T item)
    {
      Item = item;
    }
  }
  private volatile Node _head;
  private volatile Node _tail;
  public LockFreeQueue()
  {
    _head = _tail = new Node(default(T));
  }
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked
  public void Enqueue(T item)
  {
    Node newNode = new Node(item);
    for(;;)
    {
      Node curTail = _tail;
      if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)   //append to the tail if it is indeed the tail.
      {
        Interlocked.CompareExchange(ref _tail, newNode, curTail);   //CAS in case we were assisted by an obstructed thread.
        return;
      }
      else
      {
        Interlocked.CompareExchange(ref _tail, curTail.Next, curTail);  //assist obstructing thread.
      }
    }
  }    
  public bool TryDequeue(out T item)
  {
    for(;;)
    {
      Node curHead = _head;
      Node curTail = _tail;
      Node curHeadNext = curHead.Next;
      if (curHead == curTail)
      {
        if (curHeadNext == null)
        {
          item = default(T);
          return false;
        }
        else
          Interlocked.CompareExchange(ref _tail, curHeadNext, curTail);   // assist obstructing thread
      }
      else
      {
        item = curHeadNext.Item;
        if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
        {
          return true;
        }
      }
    }
  }
#pragma warning restore 420
}


In the line OnTableQueued(new TableQueuedEventArgs(Dequeue())); in your Enqueue method

use Peek instead of Dequeue

It should be

OnTableQueued(new TableQueuedEventArgs(base.Peek()));

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜