Reactive: Converting merged IObservable's into one stream that acts like BehaviorSubject
Here is the sample code I have...
var rootSubject = new Subject<Types>();
var firstSubject = rootSubject.Where(x => x == Types.First);
var secondSubject = rootSubject.Where(x => x == Types.Second);
var thirdSubject = rootSubject.Where(x => x == Types.Third);
var forthSubject = rootSubject.Where(x => x == Types.Forth);
var mergedSubject = Observable.Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
.Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))
.Replay()
.RefCount();
rootS开发者_高级运维ubject.OnNext(Types.Second);
var result = mergedSubject.First();
Console.WriteLine(String.Format("result - {0}", result));
For some reason it always just times out and return the Error type. Any idea whats going on here?
What I am trying to do is create a merged Iobservable's that is a stream that acts like BehaviorSubject, so that if an .OnNext(...) is called before .First(), first will have a value.
I believe the problem is that you're essentially not connecting the replay sequence - or possibly doing so too late. (I don't know the details of RefCount
, but my suspicion is that it only connects when something subscribes to it.)
Here's an alternative which works:
var mergedSubject = Observable
.Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
.Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))
.Replay();
mergedSubject.Connect();
rootSubject.OnNext(Types.Second);
var result = mergedSubject.First();
I don't know whether that satisfies everything you need, but it does at least print the right result for your test code :)
Jon is correct in his suspicion that RefCount only connects once something has subscribed to it. So, if you really need RefCount here, you can also do this:
var rootSubject = new Subject<Types>();
var firstSubject = rootSubject.Where(x => x == Types.First);
var secondSubject = rootSubject.Where(x => x == Types.Second);
var thirdSubject = rootSubject.Where(x => x == Types.Third);
var forthSubject = rootSubject.Where(x => x == Types.Fourth);
var mergedSubject =
Observable.Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
.Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))
.Replay().RefCount();
//added
mergedSubject.Subscribe();
rootSubject.OnNext(Types.Second);
var result = mergedSubject.First();
Console.WriteLine(String.Format("result - {0}", result));
精彩评论