开发者

How to Merge two Observables so the result completes when the any of the Observables completes?

I have this code:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnComple开发者_运维技巧ted(); // Yet it does so here. =(

I've solved my problem using OnError(new OperationCanceledException()), but I'd like a better solution (there has to be a combinator right?).


Or this, which is also quite neat:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        return Observable.CreateWithDisposable<T>(obs =>
        {
            var compositeDisposable = new CompositeDisposable();
            var subject = new Subject<T>();

            compositeDisposable.Add(subject.Subscribe(obs));
            compositeDisposable.Add(source.Subscribe(subject));
            compositeDisposable.Add(right.Subscribe(subject));


            return compositeDisposable;

        });     
    }
}

This uses a subject which will make sure only one OnCompleted is pushed to the observer in the CreateWithDisposable();


Instead of re-writing Merge to finish when either stream completes I would suggest converting the onCompleted events to onNext events and using var ss = s1.Merge(s2).TakeUntil(s1ors2complete) where s1ors2complete produces a value when either s1 or s2 ends. You could also just chain .TakeUntil(s1completes).TakeUntil(s2completes) instead of creating s1ors2complete. This approach provides better composition than a MergeWithCompleteOnEither extension as it can be used to modify any "complete when both complete" operator into a "complete when any completes" operator.

As for how to convert onNext events to onCompleted events, there are a few ways to do that. The CompositeDisposable method sounds like a good approach, and a bit of searching finds this interesting thread about converting between onNext, onError, and onCompleted notifications. I'd probably create an extension method called ReturnTrueOnCompleted using xs.SkipWhile(_ => true).concat(Observable.Return(True)) and your merge then becomes:

var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));

You could also look at using an operator like Zip which automatically completes when one of the input streams completes.


Assuming you don't need the output of either of the streams, you can use Amb combined with some magic from Materialize:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();

var ss = Observable.Amb(
        s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted), 
        s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted)
    )
    .Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());

s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from

If you need the values, you can use Do on the two subjects.


Try this:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        var completed = Observable.Throw<T>(new StreamCompletedException());

        return 
            source.Concat(completed)
            .Merge(right.Concat(completed))
            .Catch((StreamCompletedException ex) => Observable.Empty<T>());

    }

    private sealed class StreamCompletedException : Exception
    {
    }
}

What this does is concatenate an IObservable that will throw an exception when either the source or the right source completes. We can then use the Catch extension method to return an empty Observable to automatically complete the stream when either completes.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜