开发者

Parallel Producer / Consumer with fault tolerance?

I have a requirement to chunk large csv files up into several different db inserts using SqlBulkCopy. I'm intending on doing this via 2 separate Tasks, 1 for batching up the CSV file and another for inserting into the database. As an example here is what I'm thing of:

public class UberTask
{
    private readonly BlockingCollection<Tuple<string,int>> _store = new BlockingCollection<Tuple<string, int>>();

    public void PerformTask()
    {
        var notifier = new UINotifier();
        Task.Factory.StartNew(() =>
                                  {
                                      for (int i =0; i < 10; i++)
                                      {
                                          string description = string.Format("Scenario {0}", i);

                                          notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Reading '{0}' from file", description)));

                   开发者_如何学JAVA                       // represents reading the CSV file.
                                          Thread.Sleep(500);
                                          notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Enqueuing '{0}'", description)));
                                          _store.Add(new Tuple<string, int>(description, i));
                                      }
                                      _store.CompleteAdding();
                                  });

        var consumer = Task.Factory.StartNew(() =>
                                                 {
                                                     foreach (var item in _store.GetConsumingEnumerable())
                                                     {
                                                         var poppedItem = item;
                                                         notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Sending '{0}' to the database", poppedItem.Item1)));
                                                         // represents sending stuff to the database.
                                                         Thread.Sleep(1000);
                                                     }
                                                 });
        consumer.Wait();
        Console.WriteLine("complete");
    }
}

Is this a good way of pairing 2 sets of related tasks? What the above code does not handle (which it needs to):

  • If the Task that represents the CSV reading faults, the other task needs to stop (even if there is still items in _store.)
  • If the Task that represents the db inserts faults, the other process can just stop processing.
  • If either of the paired tasks faults I will need to perform some action to roll back the db updates (I'm not worried about how I will rollback), it's more a question of how do I code "a fault happened in one of the paired tasks, so I need to do some tidy up".

Any help on the above would be greatly appreciated!


You can use exception handling and cancellation tokens to do this. When a pipeline stage detects an error it catches it and sets the token. This will cancel the other stages. The finally block makes sure that the call to CompleteAdding() gets made. This is important because the recieving pipeline stage may be blocked in a wait on the collection and will not handle the cancellation until it is unblocked.

You also want to displose of any unprocessed objects in your collection, or in your case clean up your DB connections when the pipeline stage completes (in the finally) and/or as the whole pipeline shuts down.

Here's an example of a pipeline stage that does this:

    static void LoadPipelinedImages(IEnumerable<string> fileNames, 
                                    string sourceDir, 
                                    BlockingCollection<ImageInfo> original,
                                    CancellationTokenSource cts)
    {
        // ...
        var token = cts.Token;
        ImageInfo info = null;
        try
        {
            foreach (var fileName in fileNames)
            {
                if (token.IsCancellationRequested)
                    break;
                info = LoadImage(fileName, ...);
                original.Add(info, token);
                info = null;
            }                
        }
        catch (Exception e)
        {
            // in case of exception, signal shutdown to other pipeline tasks
            cts.Cancel();
            if (!(e is OperationCanceledException))
                throw;
        }
        finally
        {
            original.CompleteAdding();
            if (info != null) info.Dispose();
        }
    }

The overall pipeline code looks like this. It also supports cancelling the pipeline externally (from the UI) by setting the cancellation token.

    static void RunPipelined(IEnumerable<string> fileNames, 
                             string sourceDir, 
                             int queueLength, 
                             Action<ImageInfo> displayFn,
                             CancellationTokenSource cts)
    {
        // Data pipes 
        var originalImages = new BlockingCollection<ImageInfo>(queueLength);
        var thumbnailImages = new BlockingCollection<ImageInfo>(queueLength);
        var filteredImages = new BlockingCollection<ImageInfo>(queueLength);
        try
        {
            var f = new TaskFactory(TaskCreationOptions.LongRunning,
                                    TaskContinuationOptions.None);
            // ...

            // Start pipelined tasks
            var loadTask = f.StartNew(() =>
                  LoadPipelinedImages(fileNames, sourceDir, 
                                      originalImages, cts));

            var scaleTask = f.StartNew(() =>
                  ScalePipelinedImages(originalImages, 
                                       thumbnailImages, cts));

            var filterTask = f.StartNew(() =>
                  FilterPipelinedImages(thumbnailImages, 
                                        filteredImages, cts));

            var displayTask = f.StartNew(() =>
                  DisplayPipelinedImages(filteredImages.GetConsumingEnumerable(), 
                       ... cts));

            Task.WaitAll(loadTask, scaleTask, filterTask, displayTask);
        }
        finally
        {
            // in case of exception or cancellation, there might be bitmaps
            // that need to be disposed.
            DisposeImagesInQueue(originalImages);
            DisposeImagesInQueue(thumbnailImages);
            DisposeImagesInQueue(filteredImages);                
        }
    }

For a full sample see the Pipeline example in the download here:

http://parallelpatterns.codeplex.com/releases/view/50473

Discussed here:

http://msdn.microsoft.com/en-us/library/ff963548.aspx

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜