Get previous element in IObservable without re-evaluating the sequence
In an IObservable
sequence (in Reactive Extensions for .NET), I'd like to get the value of the previous and current elements so that I can compare them. I found an example online similar to below which accomplishes the task:
sequence.Zip(sequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur })
It w开发者_开发知识库orks fine except that it evaluates the sequence twice, which I would like to avoid. You can see that it is being evaluated twice with this code:
var debugSequence = sequence.Do(item => Debug.WriteLine("Retrieved an element from sequence"));
debugSequence.Zip(debugSequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }).Subscribe();
The output shows twice as many of the debug lines as there are elements in the sequence.
I understand why this happens, but so far I haven't found an alternative that doesn't evaluate the sequence twice. How can I combine the previous and current with only one sequence evaluation?
There's a better solution to this I think, that uses Observable.Scan and avoids the double subscription:
public static IObservable<Tuple<TSource, TSource>>
PairWithPrevious<TSource>(this IObservable<TSource> source)
{
return source.Scan(
Tuple.Create(default(TSource), default(TSource)),
(acc, current) => Tuple.Create(acc.Item2, current));
}
I've written this up on my blog here: http://www.zerobugbuild.com/?p=213
Addendum
A further modification allows you to work with arbitrary types more cleanly by using a result selector:
public static IObservable<TResult> CombineWithPrevious<TSource,TResult>(
this IObservable<TSource> source,
Func<TSource, TSource, TResult> resultSelector)
{
return source.Scan(
Tuple.Create(default(TSource), default(TSource)),
(previous, current) => Tuple.Create(previous.Item2, current))
.Select(t => resultSelector(t.Item1, t.Item2));
}
@James World addendum looks great to me, if not for Tuple<>
, which I almost always dislike: "Was .Item1 the previous? Or was it the current one? I can't remember. And what's the first argument to the selector, was it the previous item?".
For that part I liked @dcstraw definition of a dedicated ItemWithPrevious<T>
. So there you go, putting the two together (hopefully I did not mix up previous with current) with some renaming and facilities:
public static class ObservableExtensions
{
public static IObservable<SortedPair<TSource>> CombineWithPrevious<TSource>(
this IObservable<TSource> source,
TSource initialValue = default(TSource))
{
var seed = SortedPair.Create(initialValue, initialValue);
return source.Scan(seed,
(acc, current) => SortedPair.Create(current, acc.Current));
}
public static IObservable<TResult> CombineWithPrevious<TSource, TResult>(
this IObservable<TSource> source,
Func<SortedPair<TSource>, TResult> resultSelector,
TSource initialValue = default(TSource))
{
var seed = SortedPair.Create(initialValue, initialValue);
return source
.Scan(seed,
(acc, current) => SortedPair.Create(current, acc.Current))
.Select(p => resultSelector(p));
}
}
public class SortedPair<T>
{
public SortedPair(T current, T previous)
{
Current = current;
Previous = previous;
}
public SortedPair(T current) : this(current, default(T)) { }
public SortedPair() : this(default(T), default(T)) { }
public T Current;
public T Previous;
}
public class SortedPair
{
public static SortedPair<T> Create<T>(T current, T previous)
{
return new SortedPair<T>(current, previous);
}
public static SortedPair<T> Create<T>(T current)
{
return new SortedPair<T>(current);
}
public static SortedPair<T> Create<T>()
{
return new SortedPair<T>();
}
}
Evaluating twice is an indicator of a Cold observable. You can turn it to a Hot one by using .Publish():
var pub = sequence.Publish();
pub.Zip(pub.Skip(1), (...
pub.Connect();
If you only need to access the previous element during subscription, this is probably the simplest thing that will work. (I'm sure there's a better way, maybe a buffer operator on IObservable? The documentation is pretty sparse at the moment, so I can't really tell you.)
EventArgs prev = null;
sequence.Subscribe(curr =>
{
if (prev != null)
{
// Previous and current element available here
}
prev = curr;
});
EventArgs is just a stand-in for the type of your event's argument.
It turns out you can use a variable to hold the previous value and refer to it and reassign it within the chain of IObservable
extensions. This even works within a helper method. With the code below I can now call CombineWithPrevious()
on my IObservable
to get a reference to the previous value, without re-evaluating the sequence.
public class ItemWithPrevious<T>
{
public T Previous;
public T Current;
}
public static class MyExtensions
{
public static IObservable<ItemWithPrevious<T>> CombineWithPrevious<T>(this IObservable<T> source)
{
var previous = default(T);
return source
.Select(t => new ItemWithPrevious<T> { Previous = previous, Current = t })
.Do(items => previous = items.Current);
}
}
精彩评论