Reactive Extensions Subject<IEnumerable<Obj>> to Subject<IEnumerable<AggregatedObj>>
I'm having trouble taking a subject of
public Subject<IEnumerable<Person>> PersonDataSubject;
And converting it to:
public Subject<IEnumerable<BornInYear>> BornInYearSubject;
... using some linq aggregation.
The example below puts it in more context, and where I'm struggling is working out how to get an IEnumerable into the BornInYearSubject from a subscription to the PersonDataSubject.
Whatever I try I end up with an IObservable<BornInYear>
, not an IObservable<IEnumerable<BornInYear>>
.
The goal is for clients of the class to be able to subscribe to both subjects and get an IEnumerable of the respective type on each 'next' notification.
public class ReactiveTest
{
public class Person
{
public string name;
public DateTime dob;
};
public class BornInYear
{
public int Year;
public int Count;
}
public Subject<IEnumerable<Person>> PersonDataSubject = new Subject<IEnumerable<Person>>();
开发者_如何学编程 public Subject<IEnumerable<BornInYear>> BornInYearSubject= new Subject<IEnumerable<BornInYear>>();
public void LoadData()
{
// Go to hypotheritical web service and get batch of people.
IEnumerable<Person> people = WebService.Fetch();
// Notify subscribers we have a fresh batch of data.
PersonDataSubject.OnNext(people);
}
public ReactiveTest()
{
// Hookup BornInYearSubject to listen to PersonDataSubject and publish the summarised data.
PersonDataSubject.Subscribe(pd => pd.GroupBy(p => p.dob.Year)
.Select(ps => new BornInYear { Year = ps.Key, Count = ps.Count()})
.AsParallel()
);
// How do I get the results of this out and published onto BornInYearSubject?
}
}
Now I know I could achieve this using Task.Factory.StartNew(...)...
as my subscribe OnNext for the PersonDataSubject but I believe it must be possible staying more Reactive?
How about:
PersonDataSubject
.GroupBy(x => x.Dob.Year)
.Select(x => x.Aggregate(new List<BornInYear>(), (acc, x) => { acc.Add(new BornInYear { Year = ps.Key }); return acc; }))
Ok this works. Thanks for the ideas guys - the answer seems strikingly obvious in hindsight!
using System;
using System.Collections.Generic;
using System.Linq;
namespace TestReactive
{
public class ReactiveTest
{
public class Person
{
public string name;
public DateTime dob;
};
public class BornInYear
{
public int Year;
public int Count;
}
public Subject<IEnumerable<Person>> PersonDataSubject = new Subject<IEnumerable<Person>>();
public Subject<IEnumerable<BornInYear>> BornInYearSubject = new Subject<IEnumerable<BornInYear>>();
public void LoadData()
{
IEnumerable<Person> people = new List<Person> {
new Person() {name = "Bill", dob = DateTime.Now.AddYears(-10)},
new Person() {name = "Pete", dob = DateTime.Now.AddYears(-5)},
new Person() {name = "Judy", dob = DateTime.Now.AddYears(-1)},
new Person() {name = "Mike", dob = DateTime.Now.AddYears(-5)},
new Person() {name = "Jake", dob = DateTime.Now.AddYears(-5)},
new Person() {name = "Fred", dob = DateTime.Now.AddYears(-13)},
};
// Notify subscribers we have a fresh batch of data.
PersonDataSubject.OnNext(people);
}
public ReactiveTest()
{
var subj = PersonDataSubject.Select(pds => pds.GroupBy(pd => pd.dob.Year)
.Select(p => new BornInYear {
Year = p.Key, Count = p.Count()
}).AsParallel());
subj.Subscribe(BornInYearSubject);
BornInYearSubject.Subscribe( x=> Console.WriteLine("{0}", x.Count()));
LoadData();
}
}
class Program
{
static void Main(string[] args)
{
ReactiveTest rt = new ReactiveTest();
}
}
}
精彩评论