开发者

How to create a parallel prefetch for a foreach

Given the numerous new ways of performing asynchronous operations in C#, TPL, Parallel Extensi开发者_StackOverflow社区ons, Async CTP, Reactive Extensions I was wonder what the simplest way to parallelize the fetching and processing portions of the following would be:

foreach(string url in urls)
{
   var file = FetchFile(url);
   ProcessFile(file);
}

The proviso is that whilst files can be fetched at anytime ProcessFile can only handle one file at a time and should be called sequentially.

In short what is the simplest way to get FetchFile and ProcessFile to behave in a pipelined way i.e. happen concurrently?


Here's RX way. This extension will transform a steam of uri's into a stream of streams:

    public static IObservable<Stream> RequestToStream(this IObservable<string> source, 
    TimeSpan timeout)
    {
        return
            from wc in source.Select(WebRequest.Create)
            from s in Observable
                .FromAsyncPattern<WebResponse>(wc.BeginGetResponse,
                    wc.EndGetResponse)()
                .Timeout(timeout, Observable.Empty<WebResponse>())
                .Catch(Observable.Empty<WebResponse>())
            select s.GetResponseStream();
    }

Usage:

new [] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
   .ToObservable()
   .RequestToStream(TimeSpan.FromSeconds(5))
   .Do(stream = > ProcessStream(stream))
   .Subscribe();

Edit: oops, haven't noticed the file write serialization requirement. This part can be done by employing .Concat which is essentially an RX queue (another one is .Zip)

Let's have a .StreamToFile extension:

    public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source)
    {
        return Observable.Defer(() =>
            source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
    }

now you can have web requests parallel but serialize file writing that comes from them:

        new[] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
            .ToObservable()
            .RequestToStream(TimeSpan.FromSeconds(5))
            .Select((stream, i) => Tuple.Create(stream, i.ToString() + ".dat"))
            .Select(x => x.StreamToFile())
            .Concat()
            .Subscribe();


Given the constraint on ProcessFile I would say you should fetch the data asynchronously using TPL and then enqueue a token which references the preloaded data. You can then have a background thread that pulls items off the queue and hands them to the ProcessFile one by one. This is a producer/consumer pattern.

For the queue you can take a look at BlockingCollection which can provide a threadsafe queue which also has the nice effect of being able to throttle the workload.


Since I don't know all the fancy mechanisms, I'd probably do it in the old fashion way, although I doubt it would classify as "simple":

var q = new Queue<MyFile>();
var ev = new ManualResetEvent(false);

new System.Threading.Thread(() =>
{
    while ( true )
    {
        ev.WaitOne();
        MyFile item;
        lock (q)
        {
            item = q.Dequeue();
            if ( q.Count == 0 )
                ev.Reset();
        }
        if ( item == null )
            break;
        ProcessFile(item);
    }
}).Start();
foreach(string url in urls)
{
    var file = FetchFile(url);
    lock (q)
    {
        q.Enqueue(file);
        ev.Set();
    }
}
lock (q)
{
    q.Enqueue(null);
    ev.Set();
}


Asynchronous does not actually denote parallel. It simply means that your will not block waiting for another operation. But you you take advantage of async I/O to not block threads as you download the URLs, i.e. you don't need as many threads as urls to download them in parallel if you do this:

var client = new WebClient();
var syncLock = new object();
TaskEx.WhenAll(urls.Select(url => {
  client.DownloadDataTaskAsync(url).ContinueWith((t) => {
    lock(syncLock) {
      ProcessFile(t.Result);
    }
  });
}));

Basically we create a async download task per url and then as any task completes, we invoke a continuation that uses a plain object as out synclock for making sure ProcessFile happens sequentially. WhenAll won't return until the last ProcessFile continuation is done.

You could avoid the explicit lock with RX's ReplaySubject (but of course it will lock internally):

var pipeline = new ReplaySubject<byte[]>();
var files = pipeline.ToEnumerable();
var client = new WebClient();
TaskEx.WhenAll(urls
        .Select(download => client.DownloadDataTaskAsync((string) download)
            .ContinueWith(t => pipeline.OnNext(t.Result))
        )
    ).ContinueWith(task => pipeline.OnCompleted(task));
foreach(var file in files) {
    ProcessFile(file);
}

Here we use a ReplaySubject as our pipeline of file downloads. Each download finishes asynchronously and publishes its results into the pipeline which the foreach blocks on (i.e. happens sequentially). When all tasks complete, we complete the observable, which exits the foreach.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜