Silverlight Task.WaitAll using Rx
I'd like to call two webservices simultaneously and process the responses when both are done. I am calling the webservices using Rx’s Observable.FromAsyncPattern method. What is the correct method to simultaneously subscribe to multiple IObservables?
I’ve tried using Zip, but it does not appear to s开发者_Python百科tart both simultaneously, only starting the second after the first result is received.
EDIT: Here's a demonstration of Zip or some of the other solutions proposed not solving my problem --class Program
{
static void Main(string[] args)
{
var observable1 = Observable.Create<int>(i =>
{
Console.WriteLine("starting 1");
System.Threading.Thread.Sleep(2000);
Console.WriteLine("done sleeping 1");
i.OnNext(1);
i.OnCompleted();
return () => { };
});
var observable2 = Observable.Create<int>(i =>
{
Console.WriteLine("starting 2");
System.Threading.Thread.Sleep(4000);
Console.WriteLine("done sleeping 2");
i.OnNext(1);
i.OnCompleted();
return () => { };
});
var m = observable1.Zip(observable2, (a, b) => new { a, b });
var n = Observable.Merge(Scheduler.ThreadPool,
observable1, observable2);
var o = Observable.When(observable1.And(observable2).Then((a, b) => new { a, b }));
m.Subscribe(
(i) => Console.WriteLine(i),
() => Console.WriteLine("finished"));
Console.Read();
}
}
Results:
starting 1
done sleeping 1
starting 2
done sleeping 2
{ a = 1, b = 1 }
finished
Desired Results:
starting 1
starting 2
done sleeping 1
done sleeping 2
{ a = 1, b = 1 }
finished
Using the Zip
extension method is the simple answer here.
If you have a couple of typical async calls (assuming single parameter in):
Func<X1, IObservable<X2>> callX = Observable.FromAsyncPattern<X1, X2>(...);
Func<Y1, IObservable<Y2>> callY = Observable.FromAsyncPattern<Y1, Y2>(...);
Then you can call both and handle there return values once both are completed like so:
callX(x1).Zip(callY(y1), (x2, y2) =>
{
...
});
Rx Joins provide a solution.
Observable.And
Matches when both observable sequences have an available value.
A demo:
var xsL = Observable.Return(1);
var xsR = Observable.Return(2);
Observable<int> both = Observable.When(xsL.And(xsR).Then((a,b) => a + b));
Zip was the right answer here:
Observable.Zip(someWebService(), otherWebService(), (some, other) => new { some, other })
.Subscribe(someAndOther => {
Console.WriteLine(Some is {0}, and Other is {1}", someAndOther.some, someAndOther.other);
});
Assuming someWebService is a method whose signature looks like:
IObservable<SomeClass> someWebService()
If Zip isn't doing what you want it to, it's a problem with how you're invoking the web service...
Also, for the general case, if you want to know when a bunch of Observables are all terminated, you can fake it using Aggregate:
Observable.Merge(
observable1.Select(_ => Unit.Default),
observable2.Select(_ => Unit.Default),
observable3.Select(_ => Unit.Default),
observable4.Select(_ => Unit.Default))
.Aggregate(Unit.Default, (acc, _) => acc)
.Subscribe(_ => Console.WriteLine("They all finished!");
Then, you can throw a First() on the end instead of Subscribe if you want to block.
Observable.Create is not a correct model for Observable.FromAsyncPattern; The Zip, Merge, and When proposed solutions will work for Observable.FromAsyncPattern.
Observable.FromAsyncPattern adds concurrency (either from the underlying BeginXXX call or by using AsyncSubject(Scheduler.ThreadPool)), while Observable.Create will, by default, schedule on the current thread.
A better model for Observable.FromAsyncPattern would be:
var observable1 = Observable.Create<int>(i =>
{
return Scheduler.ThreadPool.Schedule(() =>
{
Console.WriteLine("starting 1");
System.Threading.Thread.Sleep(4000);
Console.WriteLine("done sleeping 1");
i.OnNext(1);
i.OnCompleted();
});
});
I stumbled across this question when I was looking for something else. There seems to be some confusion as to why the subscription to observable2 is not done simultaneously. The answer is simple: Rx is not multi-threaded by default. It is up to you to manage the threading/scheduling and it will help with the concurrency. @Enigmativity eludes to this, but I thought it warranted a deeper explaination.
To be specific; we have the following (summarised) code
var observable1 = Observable.Create<int>(i =>
{
System.Threading.Thread.Sleep(2000);
i.OnNext(1);
i.OnCompleted();
return () => { };
});
var observable2 = Observable.Create<int>(i =>
{
System.Threading.Thread.Sleep(4000);
i.OnNext(1);
i.OnCompleted();
return () => { };
});
var m = observable1.Zip(observable2, (a, b) => new { a, b });
m.Subscribe(Console.WriteLine);
If we go through this step by step it becomes obvious what the problem is.
- We declare 2 observable sequences with the Create factory method
- We compose the 2 sequences with the Zip method
- We subscribe to the composite sequence (m)
- This then invokes the delegate provided to the the Create method for observable1.
- We step into the delegate and immediately Sleep for 2seconds. Note at no point have we changed threads here. The code posted is single threaded.
- We continue in the delegate and OnNext a value of 1, we then Complete the sequence
- Still on the same thread (because this is single threaded) we then subscribe to Observable2 and step into its delegate
- We sleep for 4seconds
- We onNext 1. This gets pushed to the Zip Operator that has been waiting for the 2nd sequence to produce a value.
- The zip resultSelector function is called and an Anon type of a=1,b=1 is created and pushed to the Console (via the Subscribe method)
- The sequence completes.
Ok, so that clearly can never work. But luckily this is just an example that @foson is using to describe their question. Ironically if they used the FromAsyncPattern, it would have introduced some concurrency to their code and it would work.
The correct way to demo Zip working with a delay of 2s and 4s is to become concurrent. You can do this by either scheduling the OnNext on the current thread, or by using another thread/timer to do so.
var observable1 = Observable.Timer(TimeSpan.FromSeconds(2));
var observable2 = Observable.Timer(TimeSpan.FromSeconds(4));
var zipped = observable1.Zip(observable2, (a, b) => new { a, b });
zipped.Subscribe(Console.WriteLine);
Here we use the handy Observable.Timer factory. It will create a sequence that publishes the value 0 at the given period from when it is subscribed to, and then complete. If you have a preference on how the timer should be scheduled then you can provide an optional scheduler too eg
var observable1 = Observable.Timer(TimeSpan.FromSeconds(2), Scheduler.ThreadPool);
var observable2 = Observable.Timer(TimeSpan.FromSeconds(4), Scheduler.TaskPool);
The default scheduler (at time of writing v.1.0.10621.0 for the .NET 4.0 library) is Scheduler.ThreadPool.
You can find out more about Scheduling in my Intro to Rx series:
http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html Specifically the Scheduling and Threading post
http://leecampbell.blogspot.com/2010/06/rx-part-6-scheduling-and-threading.html
I hope that clarifies the problem with the original post.
精彩评论