Rx Extensions: How do I make a subscription dependent on another subscription?
I have a class that takes an observable in its constructor, then subscribes to it and does some stuff, sets properties etc. The class itself is observable.
I want to subscribe to my source observable only if someone is subscribed to my class, but I can't figure out how to do it.
public MyClass : IObservable<MyResult>
{
private readonly Subject<MyResult> _subject = new Subject<MyResult>();
private readonly IConnectableObservable<MySource> _source;
public MyClass(IObservable<MySource> source)
{
_source = source
//All my logic to set properties and such
//goes here as a side effect, instead of in a subscription...
.Do(...)
//I hope that by publishing, side effects will happen only once...
.Publish();
}
public IDisposable Subscribe(IObserver<MyResult> observer)
{
return new CompositeDisposable(
_source.Subscribe(/*
don't have anything to do here,
just subscribing to make sure I'm subscribed to source开发者_如何学运维...
(this can't be the right way to do it)
*/),
_subject.Subscribe(observer));
}
}
UPDATE
@Scott: I can see why implementing IObservable would be an anti-pattern. My Class
needs to consume a single observable, and exposes 3 as properties (originally the most commonly used observable was going to be returned by MyClass
itself, but I think that having it as a property might be better.
What I'm trying to write is an observable ICommand. I know some exist, but this is more of a way to learn Rx...
public class ObservableCommand<T> : ICommand
{
private readonly ISubject<T> _executeRequests = new Subject<T>();
private readonly ISubject<T> _canExecuteRequests = new Subject<T>();
public IObservable<bool> CanExecuteChanges { get; private set; }
public IObservable<T> CanExecuteRequests { get; private set; }
public IObservable<T> ExecuteRequests { get; private set; }
public ObservableCommand(IObservable<bool> canExecute)
{
var source = canExecute.DistinctUntilChanged()
//How do I dispose of subscription later?
//I have this fear that I'm going to have a chain of references,
//and my entire app will never get GC'd!
var subscription = source.Subscribe(
o => {
if (CanExecuteChanged != null)
CanExecuteChanged(this, EventArgs.Empty);
});
CanExecuteChanges = source;
CanExecuteRequests = _canExecuteRequests.AsObservable();
ExecuteRequests = _executeRequests.AsObservable();
}
#region ICommand Members
public bool CanExecute(object parameter)
{
_canExecuteRequests.OnNext(parameter is T ? (T)parameter : default(T));
}
public event EventHandler CanExecuteChanged;
public void Execute(object parameter)
{
_executeRequests.OnNext(parameter is T ? (T)parameter : default(T));
}
#endregion
}
How about just not Do
ing or Publish
ing in the constructor, but rather in the Subscribe
method?
It should be said, explicitly implementing IObservable<T>
is something of an Rx anti-pattern.
You can make Subscriptions dependent on other subscribers with Defer
and Create
, something like
IObservable<MySource> source;
IObservable<MySource> sourceWithSubSideEffect = Observable.Defer(() =>
{
// Do something interesting on Subscription
// ....
return source;
});
I've prepared a snipped for you. MyClass
implements IObservable<T>
and has also methods of IObserver<T>
but they are all private. With additional OnInitialize
and OnSubscribe
you should be able to do whatever you want on any event you want to response to.
If you want to make this snipped reusable you could define all methods as partial
as they all return void
. Then you could create definition to whatever you want.
public class MyClass<T> : IObservable<T>
{
private readonly IObservable<T> m_Source;
public MyClass(IObservable<T> source)
{
if (source == null) throw new ArgumentNullException("source");
m_Source = source.Do(OnNext, OnError, OnCompleted);
OnInitialize();
}
public IDisposable Subscribe(IObserver<T> observer)
{
OnSubscribe();
return m_Source.Subscribe(observer);
}
private void OnInitialize()
{
Console.WriteLine("OnInitialize");
}
private void OnSubscribe()
{
Console.WriteLine("OnSubscribe");
}
private void OnNext(T value)
{
Console.WriteLine("OnNext: {0}", value);
}
private void OnError(Exception error)
{
Console.WriteLine("OnError: {0}", error.Message);
}
private void OnCompleted()
{
Console.WriteLine("OnCompleted");
}
}
精彩评论