开发者

Aggregating the result of a ForkJoin in Rx

Given this piece of code:

var loadAll =
   Observable.ForkJoin(
      service1.FindBooksAsObservable().Select(s => s),
      service2.FindBooksAsObservable().Select(s => s),
      service3.FindBooksAsObservable().Select(s => s)
);

loadAll.Subscribe(
   result =>
   {
      var aggregatedListOfBooks = result.SelectMany(b => b);
   });

As you开发者_StackOverflow中文版 can see, the problem is each FindBooksAsObservable() method returns an IObservable<IEnumerable<Book>>, thus the result variable in the Subscribe() is an Array of IEnumerable<Book>.

Is there any other way of aggregating the result of the ForkJoin()? I was hoping to use something like Merge() along with the ForkJoin.


Assuming all three services return a list of Books, you can use SelectMany to merge the lists:

IObservable<Book> loadAll = 
    Observable.ForkJoin(
        service1.FindBooksAsObservable().Select(s => s),
        service2.FindBooksAsObservable().Select(s => s),
        service3.FindBooksAsObservable().Select(s => s)
    )
    .Select(books => books.SelectMany(list => list).ToList());

loadAll.Subscribe(
    book => { /* will be called once with a single list of all items */ });

You can remove the ToList() call if you don't require the output to be a list.


Observable.ForkJoin is not on the latest stable version of Reactive Extensions (Rx) (v2.1.30214.0). The ForkJoin as of now June 2013, is only in the experimental versions of Rx.

Dave Sexton suggested a work around: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/3cfccb74-9ce3-47dc-94fd-cf60270c1ed5

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜