开发者

AutoResetEvent not blocking properly

I have a thread, which creates a variable number of worker threads and distributes tasks between them. This is solved by passing the threads a TaskQueue object, whose implementation you will see below.

These worker threads simply iterate over the TaskQueue object they were given, executing each task.

private class TaskQueue : IEnumerable<Task>
{
    public int Count
    {
        get
        {
            lock(this.tasks)
            {
                return this.tasks.Count;
            }
        }
    }

    private readonly Queue<Task> tasks = new Queue<Task>();
    private readonly AutoResetEvent taskWaitHandle = new AutoResetEvent(false);

    private bool isFinishing = false;
    private bool isFinished = false;

    public void Enque开发者_如何学编程ue(Task task)
    {
        Log.Trace("Entering Enqueue, lock...");
        lock(this.tasks)
        {
            Log.Trace("Adding task, current count = {0}...", Count);
            this.tasks.Enqueue(task);

            if (Count == 1)
            {
                Log.Trace("Count = 1, so setting the wait handle...");
                this.taskWaitHandle.Set();
            }
        }
        Log.Trace("Exiting enqueue...");
    }

    public Task Dequeue()
    {
        Log.Trace("Entering Dequeue...");
        if (Count == 0)
        {
            if (this.isFinishing)
            {
                Log.Trace("Finishing (before waiting) - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }

            Log.Trace("Count = 0, lets wait for a task...");
            this.taskWaitHandle.WaitOne();
            Log.Trace("Wait handle let us through, Count = {0}, IsFinishing = {1}, Returned = {2}", Count, this.isFinishing);

            if(this.isFinishing)
            {
                Log.Trace("Finishing - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }
        }

        Log.Trace("Entering task lock...");
        lock(this.tasks)
        {
            Log.Trace("Entered task lock, about to dequeue next item, Count = {0}", Count);
            return this.tasks.Dequeue();
        }
    }

    public void Finish()
    {
        Log.Trace("Setting TaskQueue state to isFinishing = true and setting wait handle...");
        this.isFinishing = true;

        if (Count == 0)
        {
            this.taskWaitHandle.Set();
        }
    }

    public IEnumerator<Task> GetEnumerator()
    {
        while(true)
        {
            Task t = Dequeue();
            if(this.isFinished)
            {
                yield break;
            }

            yield return t;
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

As you can see, I'm using an AutoResetEvent object to make sure that the worker threads don't exit prematurely, i.e. before getting any tasks.

In a nutshell:

  • the main thread assigns a task to a thread by Enqeueue-ing a task to its TaskQueue
  • the main thread notifies the thread that are no more tasks to execute by calling the TaskQueue's Finish() method
  • the worker thread retrieves the next task assigned to it by calling the TaskQueue's Dequeue() method

The problem is that the Dequeue() method often throws an InvalidOperationException, saying that the Queue is empty. As you can see I added some logging, and it turns out, that the AutoResetEvent doesn't block the Dequeue(), even though there were no calls to its Set() method.

As I understand it, calling AutoResetEvent.Set() will allow a waiting thread to proceed (who previously called AutoResetEvent.WaitOne()), and then automatically calls AutoResetEvent.Reset(), blocking the next waiter.

So what can be wrong? Did I get something wrong? Do I have an error somewhere? I'm sitting above this for 3 hours now, but I cannot figure out what's wrong. Please help me!

Thank you very much!


Your dequeue code is incorrect. You check the Count under lock, then fly by the seams of your pants, and then you expect the tasks to have something. You cannot retain assumptions while you release the lock :). Your Count check and tasks.Dequeue must occur under lock:

bool TryDequeue(out Tasks task)
{
  task = null;
  lock (this.tasks) {
    if (0 < tasks.Count) {
      task = tasks.Dequeue();
    }
  }
  if (null == task) {
    Log.Trace ("Queue was empty");
  }
  return null != task;
 }

You Enqueue() code is similarly riddled with problems. Your Enqueue/Dequeue don't ensure progress (you will have dequeue threads blocked waiting even though there are items in the queue). Your signature of Enqueue() is wrong. Overall your post is very very poor code. Frankly, I think you're trying to chew more than you can bite here... Oh, and never log under lock.

I strongly suggest you just use ConcurrentQueue.

If you don't have access to .Net 4.0 here is an implementation to get you started:

public class ConcurrentQueue<T>:IEnumerable<T>
{
    volatile bool fFinished = false;
    ManualResetEvent eventAdded = new ManualResetEvent(false);
    private Queue<T> queue = new Queue<T>();
    private object syncRoot = new object();

    public void SetFinished()
    {
        lock (syncRoot)
        {
            fFinished = true;
            eventAdded.Set();
        }
    }

    public void Enqueue(T t)
    {
        Debug.Assert (false == fFinished);
        lock (syncRoot)
        {
            queue.Enqueue(t);
            eventAdded.Set();
        }
    }

    private bool Dequeue(out T t)
    {
        do
        {
            lock (syncRoot)
            {
                if (0 < queue.Count)
                {
                    t = queue.Dequeue();
                    return true;
                }
                if (false == fFinished)
                {
                    eventAdded.Reset ();
                }
            }
            if (false == fFinished)
            {
                eventAdded.WaitOne();
            }
            else
            {
                break;
            }
        } while (true);
        t = default(T);
        return false;
    }


    public IEnumerator<T> GetEnumerator()
    {
        T t;
        while (Dequeue(out t))
        {
            yield return t;
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}


A more detailed answer from me is pending, but I just want to point out something very important.

If you're using .NET 3.5, you can use the ConcurrentQueue<T> class. A backport is included in the Rx extensions library, which is available for .NET 3.5.

Since you want blocking behavior, you would need to wrap a ConcurrentQueue<T> in a BlockingCollection<T> (also available as part of Rx).


It looks like you are trying to replicate a blocking queue. One already exists in the .NET 4.0 BCL as a BlockingCollection. If .NET 4.0 is not an option for you then you can use this code. It use the Monitor.Wait and Monitor.Pulse method instead of AutoResetEvent.

public class BlockingCollection<T>
{
    private Queue<T> m_Queue = new Queue<T>();

    public T Take() // Dequeue
    {
        lock (m_Queue)
        {
            while (m_Queue.Count <= 0)
            {
                Monitor.Wait(m_Queue);
            }
            return m_Queue.Dequeue();
        }
    }

    public void Add(T data) // Enqueue
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(data);
            Monitor.Pulse(m_Queue);
        }
    }
}

Update:

I am fairly certain that it is not possible to implement a producer-consumer queue using AutoResetEvent if you want it to be thread-safe for multiple producers and multiple consumers (I am prepared to be proven wrong if someone can come up with a counter example). Sure, you will see examples on the internet, but they are all wrong. In fact, one such attempt by Microsoft is flawed in that the queue can get live-locked.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜