开发者

Rx - can/should I replace .NET events with Observables?

Given the benefits of composable events as offered by the Reactive Extensions (Rx) framework, I'm wondering whether my classes should stop pushing .NET events, and instead expose Rx observables.

For instance, take the following class using standard .NET events:

public class Foo
{
   private int progress;
   public event EventHandler ProgressChanged;

   public int Progress
   {
      get { return this.progress; }
      set
      {
         if (this.progress != value)
         {
            this.progress = value;

            // Raise the event while checking for no subscribers and preventing unsubscription race condition.
            var progressChanged = this.ProgressChanged;
            if (progressChanged != null)
            {
                progressChanged(this, new EventArgs());
            }
         }
      }
   }
}

Lot of monotonous plumbing.

This class could instead use some sort of observable to replace this functionality:

public class Foo
{
   public Foo()
   {
       this.Progress = some new observable;
   }

   public IObservable<int> Progress { get; private set; }
}

Far less plumbing. Intention is no longer obscured by plumbing details. This seems beneficial.

My questions for you fine StackOverflow folks are:

  1. Would it good/worthwhile to replace standard .NET events with IObservable<T> values?
  2. If I were to use an observable, what kind would I use here? Obviously I need to push values to it (e.g. Progress.UpdateValue(开发者_高级运维...) or something).


For #2, the most straightforward way is via a Subject:

Subject<int> _Progress;
IObservable<int> Progress {
    get { return _Progress; }
}

private void setProgress(int new_value) {
    _Progress.OnNext(new_value);
}

private void wereDoneWithWorking() {
    _Progress.OnCompleted();
}

private void somethingBadHappened(Exception ex) {
    _Progress.OnError(ex);
}

With this, now your "Progress" can not only notify when the progress has changed, but when the operation has completed, and whether it was successful. Keep in mind though, that once an IObservable has completed either via OnCompleted or OnError, it's "dead" - you can't post anything further to it.


I don't recommend managing your own subscriber list when there are built in subjects that can do that for you. It also removes the need for carrying your own mutable copy of T.

Below is my (commentless) version of your solution:

public class Observable<T> : IObservable<T>, INotifyPropertyChanged 
{ 
    private readonly BehaviorSubject<T> values; 

    private PropertyChangedEventHandler propertyChanged; 

    public Observable() : this(default(T))
    {
    } 

    public Observable(T initalValue) 
    { 
        this.values = new BehaviorSubject<T>(initalValue);

        values.DistinctUntilChanged().Subscribe(FirePropertyChanged);
    }

    public T Value 
    { 
        get { return this.values.First(); } 
        set { values.OnNext(value); } 
    }

    private void FirePropertyChanged(T value)
    {
        var handler = this.propertyChanged;

        if (handler != null)
            handler(this, new PropertyChangedEventArgs("Value"));
    }

    public override string ToString() 
    { 
        return value != null ? value.ToString() : "Observable<" + typeof(T).Name + "> with null value."; 
    } 

    public static implicit operator T(Observable<T> input) 
    { 
        return input.Value; 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
        return values.Subscribe(observer);
    } 

    event PropertyChangedEventHandler INotifyPropertyChanged.PropertyChanged 
    { 
        add { this.propertyChanged += value; } 
        remove { this.propertyChanged -= value; } 
    } 
}


I'll keep it short and simple:

  1. yes
  2. BehaviorSubject

:)


Ok guys, seeing as how I think it's at least worth a shot to try this, and seeing as how RX's Subject<T> isn't quite what I'm looking for, I've created a new observable that fits my needs:

  • Implements IObservable<T>
  • Implements INotifyPropertyChange to work with WPF/Silverlight binding.
  • Provides easy get/set semantics.

I call the class Observable<T>.

Declaration:

/// <summary>
/// Represents a value whose changes can be observed.
/// </summary>
/// <typeparam name="T">The type of value.</typeparam>
public class Observable<T> : IObservable<T>, INotifyPropertyChanged
{
    private T value;
    private readonly List<AnonymousObserver> observers = new List<AnonymousObserver>(2);
    private PropertyChangedEventHandler propertyChanged;

    /// <summary>
    /// Constructs a new observable with a default value.
    /// </summary>
    public Observable()
    {
    }

    public Observable(T initalValue)
    {
        this.value = initialValue;
    }

    /// <summary>
    /// Gets the underlying value of the observable.
    /// </summary>
    public T Value
    {
        get { return this.value; }
        set
        {
            var valueHasChanged = !EqualityComparer<T>.Default.Equals(this.value, value);
            this.value = value;

            // Notify the observers of the value.
            this.observers
                .Select(o => o.Observer)
                .Where(o => o != null)
                .Do(o => o.OnNext(value))
                .Run();

            // For INotifyPropertyChange support, useful in WPF and Silverlight.
            if (valueHasChanged && propertyChanged != null)
            {
               propertyChanged(this, new PropertyChangedEventArgs("Value"));
            }
        }
    }

    /// <summary>
    /// Converts the observable to a string. If the value isn't null, this will return
    /// the value string.
    /// </summary>
    /// <returns>The value .ToString'd, or the default string value of the observable class.</returns>
    public override string ToString()
    {
        return value != null ? value.ToString() : "Observable<" + typeof(T).Name + "> with null value.";
    }

    /// <summary>
    /// Implicitly converts an Observable to its underlying value.
    /// </summary>
    /// <param name="input">The observable.</param>
    /// <returns>The observable's value.</returns>
    public static implicit operator T(Observable<T> input)
    {
        return input.Value;
    }

    /// <summary>
    /// Subscribes to changes in the observable.
    /// </summary>
    /// <param name="observer">The subscriber.</param>
    /// <returns>A disposable object. When disposed, the observer will stop receiving events.</returns>
    public IDisposable Subscribe(IObserver<T> observer)
    {
        var disposableObserver = new AnonymousObserver(observer);
        this.observers.Add(disposableObserver);
        return disposableObserver;
    }

    event PropertyChangedEventHandler INotifyPropertyChanged.PropertyChanged
    {
        add { this.propertyChanged += value; }
        remove { this.propertyChanged -= value; }
    }

    class AnonymousObserver : IDisposable
    {
        public IObserver<T> Observer { get; private set; }

        public AnonymousObserver(IObserver<T> observer)
        {
            this.Observer = observer;
        }

        public void Dispose()
        {
            this.Observer = null;
        }
    }
}

Usage:

Consuming is nice and easy. No plumbing!

public class Foo
{
    public Foo()
    {
        Progress = new Observable<T>();
    } 

    public Observable<T> Progress { get; private set; }
}

Usage is simple:

// Getting the value works just like normal, thanks to implicit conversion.
int someValue = foo.Progress;

// Setting the value is easy, too:
foo.Progress.Value = 42;

You can databind to it in WPF or Silverlight, just bind to the Value property.

<ProgressBar Value={Binding Progress.Value} />

Most importantly, you can compose, filter, project, and do all the sexy things RX lets you do with IObservables:

Filtering events:

foo.Progress
   .Where(val => val == 100)
   .Subscribe(_ => MyProgressFinishedHandler());

Automatic unsubscribe after N invocations:

foo.Progress
   .Take(1)
   .Subscribe(_ => OnProgressChangedOnce());

Composing events:

// Pretend we have an IObservable<bool> called IsClosed:
foo.Progress
   .TakeUntil(IsClosed.Where(v => v == true))
   .Subscribe(_ => ProgressChangedWithWindowOpened());

Nifty stuff!


Apart from the fact that your existing eventing code could be terser:

    public event EventHandler ProgressChanged = delegate {};

    ...
       set {
          ... 
          // no need for null check anymore       
          ProgressChanged(this, new EventArgs());
   }

I think by switching to Observable<int> you are just moving complexity from the callee to the caller. What if I just want to read the int?

-Oisin

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜