开发者

How (and if) to write a single-consumer queue using the TPL?

I've heard a bunch of podcasts recently about the TPL in .NET 4.0. Most of them describe background activities like downloading images or doing a computation, using tasks so that the work doesn't interfere with a GUI thread.

Most of the code I work on has more of a multiple-producer / single-consumer flavor, where work items from multiple sources must be queued and then processed in order. One example would be logging, where log lines from multiple threads are sequentialized into a single queue for eventual writing to a file or database. All the records from any single source must remain in order, and records from the same moment in time should be "close" to each other in the eventual output.

So multiple threads or tasks or whatever are all invoking a queuer:

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}

And a dedicated worker thread processes the q开发者_如何学运维ueue:

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}

It's always seemed reasonable to dedicate a worker thread for the consumer side of these tasks. Should I write future programs using some construct from the TPL instead? Which one? Why?


You can use a long running Task to process items from a BlockingCollection as suggested by Wilka. Here's an example which pretty much meets your applications requirements. You'll see output something like this:

Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C

Not that outputs from A, B, C & D appear random because they depend on the start time of the threads but B always appears before B1.

public class LogItem 
{
    public string Message { get; private set; }

    public LogItem (string message)
    {
        Message = message;
    }
}

public void Example()
{
    BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();

    // Start queue listener...
    CancellationTokenSource canceller = new CancellationTokenSource();
    Task listener = Task.Factory.StartNew(() =>
        {
            while (!canceller.Token.IsCancellationRequested)
            {
                LogItem item;
                if (_queue.TryTake(out item))
                    Console.WriteLine(item.Message);
            }
        },
    canceller.Token, 
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    // Add some log messages in parallel...
    Parallel.Invoke(
        () => { _queue.Add(new LogItem("Log from task A")); },
        () => { 
            _queue.Add(new LogItem("Log from task B")); 
            _queue.Add(new LogItem("Log from task B1")); 
        },
        () => { _queue.Add(new LogItem("Log from task C")); },
        () => { _queue.Add(new LogItem("Log from task D")); });

    // Pretend to do other things...
    Thread.Sleep(1000);

    // Shut down the listener...
    canceller.Cancel();
    listener.Wait();
}


I know this answer is about a year late, but take a look at MSDN.

which shows how to create a LimitedConcurrencyLevelTaskScheduler from the TaskScheduler class. By limiting the concurrency to a single task, that should then process your tasks in order as they are queued via:

LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);

factory.StartNew(()=> 
{
   // your code
});


I'm not sure that TPL is adequate in your use case. From my understanding the main use case for TPL is to split one huge task into several smaller tasks that can be run side by side. For example if you have a big list and you want to apply the same transformation on each element. In this case you can have several tasks applying the transformation on a subset of the list.

The case you describe doesn't seem to fit in this picture for me. In your case you don't have several tasks that do the same thing in parallel. You have several different tasks that each does is own job (the producers) and one task that consumes. Perhaps TPL could be used for the consumer part if you want to have multiple consumers because in this case, each consumer does the same job (assuming you find a logic to enforce the temporal consistency you look for).

Well, this of course is just my personnal view on the subject

Live long and prosper


It sounds like BlockingCollection would be handy for you. So for your code above, you could use something like (assuming _queue is a BlockingCollection instance):

// for your producers 
_queue.Add(some_work);

A dedicated worker thread processing the queue:

foreach (var some_work in _queue.GetConsumingEnumerable())
{
    deal_with(some_work);
}

Note: when all your producers have finished producing stuff, you'll need to call CompleteAdding() on _queue otherwise your consumer will be stuck waiting for more work.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜