Tree Hierarchy of WebClient Requests using TPL
I am developing a WPF app in C# where I have an Uri that I want to download Json data. The code will deserialize the downloaded Json data to object, thereafter, the object has a list of Uris that would require to request more Json data which i would like to request in parallel. And the downloaded Json data might have more Uris to request. The code should be able to do WebClient.CancelAsync on the parent and its children's WebClient when desired. I am looking at the Task Parallel Library and finding it difficult to grasp and implement it. I'm unsure if I should use the TPL's Cancellation token to invoke the CancelAsync or the CancelAsync to cancel the TPL's Cancellation token. And I not sure if i should use nested Tasks for the children WebClient....?
If anyone has a similar scenario and implemented it using the new TPL.. would开发者_Go百科 you mind sharing a snippet of the code...
Thanks.
If I can suggest using Reactive Extensions (Rx) on top of the TPL then this can be done quite easily without the need for cancellation tasks etc.
If I can assume you have the following:
// The initial Uri
Uri initialUri = ...;
// A function to return the JSON string from a given Uri
Func<Uri, string> getJason = ...;
// Turn the JSON into the next set of Uris to fetch
Func<string, IEnumerable<Uri>> getUris = ...;
Then, using Rx, you turn these functions into functions that return observables using the Task Pool, like so:
Func<Uri, IObservable<string>> getJasonObsFunc = u =>
Observable
.FromAsyncPattern<Uri, string>(
getJason.BeginInvoke,
getJason.EndInvoke)
.Invoke(u)
.ObserveOn(Scheduler.TaskPool);
Func<string, IObservable<Uri>> getUrisObsFunc = j =>
Observable
.FromAsyncPattern<string, IEnumerable<Uri>>(
getUris.BeginInvoke,
getUris.EndInvoke)
.Invoke(j)
.ObserveOn(Scheduler.TaskPool)
.SelectMany(xs => xs.ToObservable());
You'll need a callback to get the Uri/JSON pairs out. Something like this:
Action<Uri, string> callback = (u, j) =>
Console.WriteLine(String.Format("{0} => {1}", u, j));
Here's the recursive LINQ query that will recursively fetch each JSON string:
Func<Uri, IObservable<Uri>> getAllUris = null;
getAllUris = u =>
Observable
.Return<Uri>(u)
.Merge(
from j in getJasonObsFunc(u).Do(k => callback(u, k))
from u1 in getUrisObsFunc(j)
from u2 in getAllUris(u1)
select u2);
Then you invoke all of this goodness using the following line:
var subscription = getAllUris(initialUri).Subscribe();
Now, if you want to cancel the query execution just call this:
subscription.Dispose();
Rx handles all of the tasks and cancels them all for you.
I hope this helps.
Here are the links for Rx:
- Reactive Extensions Forum
- Reactive Extensions (Rx) v1.0.10621
I'd recommend you to avoid creating nested tasks because you loose control over the number of running tasks, instead you could use the dispatch pattern using BlockingCollection, where there is limited number of workers tasks, they dispatch work from the collection and they can add more work to it if necessary, and when all objects are downloaded you call CompleteAdding to unblock all waiting tasks.
精彩评论