Aggregating the result of a ForkJoin in Rx
Given this piece of code:
var loadAll =
Observable.ForkJoin(
service1.FindBooksAsObservable().Select(s => s),
service2.FindBooksAsObservable().Select(s => s),
service3.FindBooksAsObservable().Select(s => s)
);
loadAll.Subscribe(
result =>
{
var aggregatedListOfBooks = result.SelectMany(b => b);
});
As you开发者_StackOverflow中文版 can see, the problem is each FindBooksAsObservable() method returns an IObservable<IEnumerable<Book>>
, thus the result variable in the Subscribe() is an Array of IEnumerable<Book>
.
Is there any other way of aggregating the result of the ForkJoin()? I was hoping to use something like Merge() along with the ForkJoin.
Assuming all three services return a list of Books
, you can use SelectMany
to merge the lists:
IObservable<Book> loadAll =
Observable.ForkJoin(
service1.FindBooksAsObservable().Select(s => s),
service2.FindBooksAsObservable().Select(s => s),
service3.FindBooksAsObservable().Select(s => s)
)
.Select(books => books.SelectMany(list => list).ToList());
loadAll.Subscribe(
book => { /* will be called once with a single list of all items */ });
You can remove the ToList()
call if you don't require the output to be a list.
Observable.ForkJoin
is not on the latest stable version of Reactive Extensions (Rx) (v2.1.30214.0). The ForkJoin as of now June 2013, is only in the experimental versions of Rx.
Dave Sexton suggested a work around: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/3cfccb74-9ce3-47dc-94fd-cf60270c1ed5
精彩评论