开发者

Reactive: Converting merged IObservable's into one stream that acts like BehaviorSubject

Here is the sample code I have...

var rootSubject = new Subject<Types>();

var firstSubject = rootSubject.Where(x => x == Types.First);
var secondSubject = rootSubject.Where(x => x == Types.Second);
var thirdSubject = rootSubject.Where(x => x == Types.Third);
var forthSubject = rootSubject.Where(x => x == Types.Forth);

var mergedSubject = Observable.Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
                        .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))
                        .Replay()
                        .RefCount();

rootS开发者_高级运维ubject.OnNext(Types.Second);

var result = mergedSubject.First();

Console.WriteLine(String.Format("result - {0}", result));

For some reason it always just times out and return the Error type. Any idea whats going on here?

What I am trying to do is create a merged Iobservable's that is a stream that acts like BehaviorSubject, so that if an .OnNext(...) is called before .First(), first will have a value.


I believe the problem is that you're essentially not connecting the replay sequence - or possibly doing so too late. (I don't know the details of RefCount, but my suspicion is that it only connects when something subscribes to it.)

Here's an alternative which works:

var mergedSubject = Observable
    .Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
    .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))
    .Replay();

mergedSubject.Connect();
rootSubject.OnNext(Types.Second);

var result = mergedSubject.First();

I don't know whether that satisfies everything you need, but it does at least print the right result for your test code :)


Jon is correct in his suspicion that RefCount only connects once something has subscribed to it. So, if you really need RefCount here, you can also do this:

var rootSubject = new Subject<Types>();
var firstSubject = rootSubject.Where(x => x == Types.First);
var secondSubject = rootSubject.Where(x => x == Types.Second);
var thirdSubject = rootSubject.Where(x => x == Types.Third);
var forthSubject = rootSubject.Where(x => x == Types.Fourth);
var mergedSubject = 
    Observable.Merge(firstSubject, secondSubject, thirdSubject, forthSubject)  
    .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))
    .Replay().RefCount();

//added
mergedSubject.Subscribe();  

rootSubject.OnNext(Types.Second);

var result = mergedSubject.First();

Console.WriteLine(String.Format("result - {0}", result));
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜