开发者

Serial Task Executor; is this thread safe?

I have a class that I've created to allow asynchronous sequential e开发者_Python百科xecution of tasks, using the ThreadPool as the means of execution. The idea is that I'll have multiple instances running serial tasks in the background, but I don't want to have a separate dedicated Thread for each instance. What I'd like to check is whether this class is actually thread safe. It's fairly brief, so I thought I'd run it by the experts here, in case I'm missing something obvious. I've omitted a few of the convenience overloads for different Action types.

/// <summary>
/// This class wraps ThreadPool.QueueUserWorkItem, but providing guaranteed ordering of queued tasks for this instance.
/// Only one task in the queue will execute at a time, with the order of execution matching the order of addition.
/// This is designed as a lighter-weight alternative to using a dedicated Thread for processing of sequential tasks.
/// </summary>
public sealed class SerialAsyncTasker
{
    private readonly Queue<Action> mTasks = new Queue<Action>();
    private bool mTaskExecuting;

    /// <summary>
    /// Queue a new task for asynchronous execution on the thread pool.
    /// </summary>
    /// <param name="task">Task to execute</param>
    public void QueueTask(Action task)
    {
        if (task == null) throw new ArgumentNullException("task");

        lock (mTasks)
        {
            bool isFirstTask = (mTasks.Count == 0);
            mTasks.Enqueue(task);

            //Only start executing the task if this is the first task
            //Additional tasks will be executed normally as part of sequencing
            if (isFirstTask && !mTaskExecuting)
                RunNextTask();
        }
    }

    /// <summary>
    /// Clear all queued tasks.  Any task currently executing will continue to execute.
    /// </summary>
    public void Clear()
    {
        lock (mTasks)
        {
            mTasks.Clear();
        }
    }

    /// <summary>
    /// Wait until all currently queued tasks have completed executing.
    /// If no tasks are queued, this method will return immediately.
    /// This method does not prevent the race condition of a second thread 
    /// queueing a task while one thread is entering the wait;
    /// if this is required, it must be synchronized externally.
    /// </summary>
    public void WaitUntilAllComplete()
    {
        lock (mTasks)
        {
            while (mTasks.Count > 0 || mTaskExecuting)
                Monitor.Wait(mTasks);
        }
    }

    private void RunTask(Object state)
    {
        var task = (Action)state;
        task();
        mTaskExecuting = false;
        RunNextTask();
    }

    private void RunNextTask()
    {
        lock (mTasks)
        {
            if (mTasks.Count > 0)
            {
                mTaskExecuting = true;
                var task = mTasks.Dequeue();
                ThreadPool.QueueUserWorkItem(RunTask, task);
            }
            else
            {
                //If anybody is waiting for tasks to be complete, let them know
                Monitor.PulseAll(mTasks);
            }
        }
    }
}

UPDATE: I've revised the code to fix the main bugs kindly pointed out by Simon. This passes unit tests now, but I still welcome observations.


Don't do it. (Or at least avoid building your own stuff.)

Use the System.Threading.Tasks stuff (new in .NET 4.0). Create your a Task[] (size depends on number of parallel tasks you want) and let them read work items from a BlockingCollection while waiting for a CancellationToken. Your WaitForAll implementation would trigger your token, and call Task.WaitAll(Task[]) which will block until all your tasks are done.


Here's my second answer assuming that you cant use .NET 4.0 (and want comments on your existing code).

QueueTask enqueues the first task, getting isFirstTask = true, and starts a new thread. However, another thread may enqueue something while the first thread is processing, and Count == 0 => isFirstTask = true, and yet another thread is spawned.

Also, WaitUntilAllComplete will hang indefinitely if the task execution throws an exception (which may not necessarily crash everything, depending on exception handling), causing it to skip the call to RunNextTask().

And your WaitUntilAllComplete just waits until there are no more enqueue tasks, not that those currently executing are actually executing (they could just be enqueued in the ThreadPool) or complete.


It's built in in 4.0

How to: Create a Task Scheduler That Limits the Degree of Concurrency

You can also use a custom scheduler to achieve functionality that the default scheduler does not provide, such as strict first-in, first-out (FIFO) execution order. The following example demonstrates how to create a custom task scheduler. This scheduler lets you specify the degree of concurrency.


I see a few issues your with your SerialAsyncTasker class, but it sounds like you might have a good grasp of those so I will not go into any details on that topic (I may edit my answer with more details later). You indicated in the comments that you cannot use .NET 4.0 features nor can you use the Reactive Extensions backport. I propose that you use the producer-consumer pattern with a single consumer on a dedicated thread. This would perfectly fit your requirement of asynchronously executing tasks sequentially.

Note: You will have to harden the code to support gracefully shutting down, handling exceptions, etc.

public class SerialAsyncTasker
{
  private BlockingCollection<Action> m_Queue = new BlockingCollection<Action>();

  public SerialAsyncTasker()
  {
    var thread = new Thread(
      () =>
      {
        while (true)
        {
          Action task = m_Queue.Take();
          task();
        }
      });
    thread.IsBackground = true;
    thread.Start();
  }

  public void QueueTask(Action task)
  {
    m_Queue.Add(task);
  }
}

Too bad you cannot use the BlockingCollection from the .NET 4.0 BCL or Reactive Extension download, but no worries. It is actually not too hard to implement one yourself. You can use Stephen Toub's blocking queue as a starting point and just rename a few things.

public class BlockingCollection<T>
{
    private Queue<T> m_Queue = new Queue<T>();

    public T Take()
    {
        lock (m_Queue)
        {
            while (m_Queue.Count <= 0) Monitor.Wait(m_Queue);
            return m_Queue.Dequeue();
        }
    }

    public void Add(T value)
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(value);
            Monitor.Pulse(m_Queue);
        }
    }
}


public class ParallelExcecuter
{
    private readonly BlockingCollection<Task> _workItemHolder;

    public ParallelExcecuter(int maxDegreeOfParallelism)
    {
        _workItemHolder = new BlockingCollection<Task>(maxDegreeOfParallelism);
    }

    public void Submit(Action action)
    {
        _workItemHolder.Add(Task.Run(action).ContinueWith(t =>
        {
            _workItemHolder.Take();
        }));

    }

    public void WaitUntilWorkDone()
    {
        while (_workItemHolder.Count < 0)
        {
            Monitor.Wait(_workItemHolder);
        }
    }
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜