How do I get an IObservable to push the newest value upon subscription
Usually when you subscribe to the changes of a value you are also interested in knowing the initial value. I want my IObservable to cache the latest (or initial) value and push that value on subscription.
When using plain events I often end up with code that looks like
x.SomeEvent += SomeEventHandler;
SomeEventHandler(x, EventArgs.Empty);
Using IObservable I was hoping to wrap the event with something that pushes the initial value. If I have multiple subscribers they should receive the newest value upon subscription I have some code that works right if I subscribe right after creating the IObservable but not if the event fires before subscribing:
class Program
{
static void Main()
{
var s = new Source { Value = 1 };
var values = Observable.Return(s.Value).Concat(
Observable.FromEvent(
h => s.ValueChanged += h,
h => s.ValueChanged -= h)
.Select(_ => s.Value));
using (values.Subscribe(Console.WriteLine))
{
s.Value = 2; // prints 1,2 as expected
}
using (values.Subscribe(Console.WriteL开发者_如何学运维ine))
{
s.Value = 3; // prints 1,3 - expected 2,3
}
}
}
class Source
{
private int _value;
public int Value
{
get { return _value; }
set
{
if (_value == value)
return;
_value = value;
if (ValueChanged != null)
ValueChanged(this, EventArgs.Empty);
}
}
public event EventHandler ValueChanged;
}
How do I create an IObservable that works as expected?
The solution is to subscribe a BehaviorSubject
to the observable, and to subscribe all observers to the BehaviorSubject
. The BehaviorSubject
will remember the last notification and notify new observers of it upon subscription.
Have a look at the Observable.Publish
extension method that has a initialValue
parameter. This creates an IConnectableObservable
that internally uses an BehaviorSubject
.
var s = new Source { Value = 1 };
var values = Observable.FromEvent(h => s.ValueChanged += h,
h => s.ValueChanged -= h)
.Select(e => e.NewValue)
.Publish(s.Value);
using (values.Connect()) // subscribes subject to event
{
using (values.Subscribe(Console.WriteLine)) // subscribes to subject
{
s.Value = 2;
} // unsubscribes from subject
using (values.Subscribe(Console.WriteLine)) // subscribes to subject
{
s.Value = 3;
} // unsubscribes from subject
} // unsubscribes subject from event
(untested)
精彩评论