Rx: Are observables "repeatable" like IEnumerable, and if not, how does this code work?
Yesterday I watched the screencast Writing your first Rx Application (on Channel 9) where Wes Dyer shows how to implement Drag 'n' Drop using Reactive Extensions (Rx). Something that I still don't understand:
Towards the end of the screencast, Wes Dyer types in the following:
var q = from start in mouseDown
from delta in mouseMove.StartsWith(start).Until(mouseUp)
.Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
select delta;
Briefly, q
is an observable that pushes the mouse move coordinate deltas to its subscribers.
What I don't understand is how the mm.Zip(mm.Skip(1), ...)
can possibly work!?
As far as I know, IObservable
is not enumerable in the sense that IEnumerable
is. Thanks to the "pull" nature of IEnumerable
, it can be iterated over again and again, always yielding the same items. (At least this should be the case for all well-behaved enumerables.) IObservable
works differently. Items are pushed to the subscribers once, and that was it. In the above example, mouse moves are single incidents which cannot be repeated without having been recorded in-memory.
So, how can the combination of .Zip
with .Skip(1)
possibly work, since the mouse events they're working on are single, non-repeatable incidents? Doesn't this operation require that mm
is "looked at" independently twice?
For reference, here's the method signature of Observable.Zip
:
public static IObservable<TResult> Zip 开发者_如何学C<TLeft, TRight, TResult>
(
this IObservable<TLeft> leftSource, // = mm
IObservable<TRight> rightSource, // = mm.Skip(1)
Func<TLeft, TRight, TResult> selector
)
P.S.: I just saw that there's another screencast on the Zip
operator which is quite insightful.
Doesn't this operation require that mm is "looked at" independently twice?
Thats in fact the answer of your question: You can subscribe to the same IObservable
sequence multiple times.
The mm.Skip(1)
subscribes to mm
and hides the first value to its own subscribers.
Zip is a subscriber of both mm.Skip(1)
and mm
. Because mm
yielded one more value than mm.Skip(1)
, Zip internally buffers the last mousemove event from mm
all the time in order to zip it with the next future mousemove event. The selector function can then select the delta between both.
Another thing you should notice is (which is the real answer to the title of your question), that this Observable.FromEvent
-IObservable
is a hot observable and therefore not repeatable. But there are cold Observables which are in fact repeatable, like Observable.Range(0,10). In the latter case each subscriber will receive the same 10 events, because they are generated independently for each subscriber. For mousemove events this is not the case (you wont get mouse move events from the past). But because Zip subscribes to the right and left sequence at the same time its likely the same in this case.
P.S.: You can also crate a hot / not repeatable IEnumerable
: It does not need to return the same values for each enumerator. You could for instance create an IEnumerable which waits until a mousemove event occurs an then yield the event. In this case the enumerator would always block (bad design), but it would be possible. ;)
Aha! The Zip
screencast that I mentioned in the P.S. gave me a vital clue: Zip
"remembers" items to account for the fact that items may arrive from one observable sooner than from the other. I'll attempt an answer to my question, I hope someone can correct me if I'm wrong.
Zip
pairs up inputs from two observable sequences like this (letters and digits are "events"):
mm ----A---------B-------C------D-----------E----->
| | | | |
| | | | |
mm.Skip(1) ----+---------1-------2------3-----------4----->
| | | | |
| | | | |
mm.Zip(mm.Skip(1), ...) ----+--------A,1-----B,2----C,3---------D,4---->
And it indeed has to do internal buffering. In the code that I posted, mm
is the real, "live" observable. mm.Skip(1)
is something like a state machine derived from it. Alex Paven's answer briefly explains how this works.
So, mm.Zip(mm.Skip(1), ...)
does indeed look at mm
twice, once directly, and once through the Skip(n)
filter. And because observables aren't repeatable sequences, it does internal buffering to account for the fact that one sequence will yield items sooner than the other.
(I quickly glanced at the Rx source with .NET Reflector and indeed, Zip
involves a Queue
.)
Items are pushed to the subscribers once, and that was it.
Yes, one item is pushed once, but the item is one of a 'sequence' of events. The sequence is still a sequence. That's why Skip works - it skips one item and then, when the next one comes, processes it (doesn't skip it).
精彩评论