开发者

Rx Extensions: How do I make a subscription dependent on another subscription?

I have a class that takes an observable in its constructor, then subscribes to it and does some stuff, sets properties etc. The class itself is observable.

I want to subscribe to my source observable only if someone is subscribed to my class, but I can't figure out how to do it.

public MyClass : IObservable<MyResult>
{
    private readonly Subject<MyResult> _subject = new Subject<MyResult>();
    private readonly IConnectableObservable<MySource> _source;

    public MyClass(IObservable<MySource> source)
    {
         _source = source
             //All my logic to set properties and such
             //goes here as a side effect, instead of in a subscription...
             .Do(...)
             //I hope that by publishing, side effects will happen only once...
             .Publish();
    }

    public IDisposable Subscribe(IObserver<MyResult> observer)
    {
        return new CompositeDisposable(
             _source.Subscribe(/* 
                  don't have anything to do here,
                  just subscribing to make sure I'm subscribed to source开发者_如何学运维...
                  (this can't be the right way to do it)
             */),
             _subject.Subscribe(observer));
    }
}

UPDATE

@Scott: I can see why implementing IObservable would be an anti-pattern. My Class needs to consume a single observable, and exposes 3 as properties (originally the most commonly used observable was going to be returned by MyClass itself, but I think that having it as a property might be better.

What I'm trying to write is an observable ICommand. I know some exist, but this is more of a way to learn Rx...

public class ObservableCommand<T> : ICommand
{
    private readonly ISubject<T> _executeRequests = new Subject<T>();
    private readonly ISubject<T> _canExecuteRequests = new Subject<T>();

    public IObservable<bool> CanExecuteChanges { get; private set; }
    public IObservable<T> CanExecuteRequests { get; private set; }
    public IObservable<T> ExecuteRequests { get; private set; }

    public ObservableCommand(IObservable<bool> canExecute)
    {
        var source = canExecute.DistinctUntilChanged()

        //How do I dispose of subscription later?
        //I have this fear that I'm going to have a chain of references, 
        //and my entire app will never get GC'd!
        var subscription = source.Subscribe(
            o => {
                if (CanExecuteChanged != null)
                    CanExecuteChanged(this, EventArgs.Empty);
            });

        CanExecuteChanges = source;

        CanExecuteRequests = _canExecuteRequests.AsObservable();

        ExecuteRequests = _executeRequests.AsObservable();
    }

    #region ICommand Members

    public bool  CanExecute(object parameter)
    {
        _canExecuteRequests.OnNext(parameter is T ? (T)parameter : default(T));
    }

    public event EventHandler  CanExecuteChanged;

    public void  Execute(object parameter)
    {
        _executeRequests.OnNext(parameter is T ? (T)parameter : default(T));
    }

    #endregion
}


How about just not Doing or Publishing in the constructor, but rather in the Subscribe method?

It should be said, explicitly implementing IObservable<T> is something of an Rx anti-pattern.

You can make Subscriptions dependent on other subscribers with Defer and Create, something like

IObservable<MySource> source;
IObservable<MySource> sourceWithSubSideEffect =  Observable.Defer(() =>
{
   // Do something interesting on Subscription
   // ....
   return source;
});


I've prepared a snipped for you. MyClass implements IObservable<T> and has also methods of IObserver<T> but they are all private. With additional OnInitialize and OnSubscribe you should be able to do whatever you want on any event you want to response to.

If you want to make this snipped reusable you could define all methods as partial as they all return void. Then you could create definition to whatever you want.

public class MyClass<T> : IObservable<T>
{
    private readonly IObservable<T> m_Source;

    public MyClass(IObservable<T> source)
    {
        if (source == null) throw new ArgumentNullException("source");
        m_Source = source.Do(OnNext, OnError, OnCompleted);
        OnInitialize();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        OnSubscribe();
        return m_Source.Subscribe(observer);
    }

    private void OnInitialize()
    {
        Console.WriteLine("OnInitialize");
    }
    private void OnSubscribe()
    {
        Console.WriteLine("OnSubscribe");
    }
    private void OnNext(T value)
    {
        Console.WriteLine("OnNext: {0}", value);
    }
    private void OnError(Exception error)
    {
        Console.WriteLine("OnError: {0}", error.Message);
    }
    private void OnCompleted()
    {
        Console.WriteLine("OnCompleted");
    }    
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜