开发者

How to process queued items serially on a low priority thread using Parallel Extensions

I want to know what the best way to process the results of a long-running process serially but on a low-priority thread using the .NET 4.0 Parallel Extensions.

I have the following class that both does the execution and provides the results:

public class Step 
{
  public string Name { get; set; }
  public TimeSpan Duration { get; set; }
  public bool Completed { get; set; }

  public void Execute() 
  {
     DateTime before = DateTime.Now;
     RemotedService.DoSomeStuff();
     Duration = DateTime.Now - before;
     Completed = true;
  }
}

How can I process these steps and also save them to a file, in order, after they are processed? I would like to save them to a file while RemotedService.DoSomeStuff() is waiting for a response from the server.

Writing to the file would be like this:

using (StreamWriter w = File.AppendText("myFile.t开发者_StackOverflowxt"))
{
  var completedStep = completedSteps.Dequeue();
  w.WriteLine(string.Format("Completed {0} with result: {1} and duration {2}", 
                            completedStep.Name, 
                            completedStep.Completed, 
                            completedStep.Duration));
}

One option that comes to mind is to add them to a Queue and have a Timer that processes them. But this doesn't take advantage of the downtime of the remoted calls.

Another option that comes to mind is to asynchronously write each result to the file using a System.Threading.Tasks.Task per Step, but that doesn't guarantee that they will be saved in order and may also introduce contention with writing to the file.


I would suggest creating a BlockingCollection<Step> (see System.Collections.Concurrent namespace). As each step is completed, it's added to that collection. The default behavior of BlockingCollection is to function as a queue, so you'll get the FIFO behavior you're looking for.

A second thread services the queue, removing each item and writing it to a log file.

So, if you added the queue:

static private BlockingCollection<Step> LogQueue = new BlockingCollection<Step>();

You would add this to your Execute method, after the item is completed:

LogQueue.Add(this);

And your logging thread, which you would start in the static constructor:

static void LoggingThread()
{
    using (var w = new StreamWriter(...))
    {
        while (!LogQueue.IsCompleted())
        {
            Step item;
            if (LogQueue.TryTake(out item))
            {
                w.WriteLine(....);
            }
        }
    }
}

The logging thread as I've written it uses a System.Threading thread. There might be an easier or better way to do it with TPL. I'm not yet terribly familiar with TPL, so I couldn't say.


One approach is to create a custom Task Scheduler (see http://msdn.microsoft.com/en-us/library/ee789351.aspx). Your custom task scheduler can limit concurrency to one-at-a-time and enforce strict in-order execution.

Creating a task scheduler also allows you to control the thread priority, or the ApartmentState which may be desirable in some situations.


You're literally describing the use case for Workflow Foundation - it does all of this stuff for you :)

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜