How do you chain together two Asynchronous Operations with the Reactive Framework?
All I really want to do is complete two async operations, one right after the other. E.g.
Download website X. Once that's completed, 开发者_Go百科download website Y.
Do a SelectMany from both observables (use ToAsync to get the operations as IObservables), just like a LINQ select many (or using syntax sugar):
var result =
from x in X
from y in Y
select new { x, y }
There are other options but it depends on your specific scenario.
Maybe something along the following lines:
static IObservable<DownloadProgressChangedEventArgs> CreateDownloadFileObservable(string url, string fileName)
{
IObservable<DownloadProgressChangedEventArgs> observable =
Observable.CreateWithDisposable<DownloadProgressChangedEventArgs>(o =>
{
var cancellationTokenSource = new CancellationTokenSource();
Scheduler.TaskPool.Schedule
(
() =>
{
Thread.Sleep(3000);
if (!cancellationTokenSource.Token.IsCancellationRequested)
{
WebClient client = new WebClient();
client.DownloadFileAsync(new Uri(url), fileName,fileName);
DownloadProgressChangedEventHandler prgChangedHandler = null;
prgChangedHandler = (s,e) =>
{
o.OnNext(e);
};
AsyncCompletedEventHandler handler = null;
handler = (s, e) =>
{
prgChangedHandler -= prgChangedHandler;
if (e.Error != null)
{
o.OnError(e.Error);
}
client.DownloadFileCompleted -= handler;
o.OnCompleted();
};
client.DownloadFileCompleted += handler;
client.DownloadProgressChanged += prgChangedHandler;
}
else
{
Console.WriteLine("Cancelling download of {0}",fileName);
}
}
);
return cancellationTokenSource;
}
);
return observable;
}
static void Main(string[] args)
{
var obs1 = CreateDownloadFileObservable("http://www.cnn.com", "cnn.htm");
var obs2 = CreateDownloadFileObservable("http://www.bing.com", "bing.htm");
var result = obs1.Concat(obs2);
var subscription = result.Subscribe(a => Console.WriteLine("{0} -- {1}% complete ",a.UserState,a.ProgressPercentage), e=> Console.WriteLine(e.Message),()=> Console.WriteLine("Completed"));
Console.ReadKey();
subscription.Dispose();
Console.WriteLine("Press a key to exit");
Console.ReadKey();
}
You could turn the CreateDownloadFileObservable into an extension method on WebClient, and remove the WebClient client = new WebClient();
in the method definiton, the client woudl then look like so:
WebClient c = new WebClient();
c.CreateDownloadFileObservable ("www.bing.com","bing.htm");
I don't like suggestions like this one myself, but...
If you need to perform two operations sequentially, do them sequentially (you can still do them on a thread different from main).
If you still want to separate the tasks in code, then the new construct from System.Parallel would be appropriate:
var task1 = Task.Factory.StartNew (() => FirstTask());
var task2 = task1.ContinueWith (frst => SecondTask ());
And if this is a way to grok system.reactive, try Observable.GenerateInSequence - but it surely will be overkill. Remember, observable is a counterpart to enumerable, and it is better used in a similar way (iterating over data). It is not a tool to run async operations.
Edit: I want to admit that I was wrong and Richard was right - at the time of my reply I wasn't fully comfortable with RX. Now I think that RX is the most natural way to start asynchronous operations. Yet, Richard's answer is a bit skimpy, should be:
var result =
from x in XDownload.ToAsync()()
from y in YDownload.ToAsync()()
select y
精彩评论