Zipping Rx IObservable with infinite number set
I have a开发者_如何学运维 IObservable [named rows in the sample below] from Reactive extensions framework and I want to add index numbers to each object it observes.
I've tried to implement this using Zip function:
rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
new { Row = row, Index = index })
.Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());
.. but unfortunately this throws
ArgumentOutOfRangeException: Specified argument was out of the range of valid values. Parameter name: disposables
Am I understanding the Zip function wrong or is there a problem with my code?
The Range part of the code doesn't seem to be the problem and the IObservable isn't yet receiving any events.
.Select has an overload to include the index:
rows.Select((row, index) => new { row, index });
Apparently, Zip extension methods converts the original custom IObservable to an anonymous observable and Subscribing to it creates an System.Collections.Generic.AnonymousObserver, which doesn't implement IDisposable. Thus, you cannot implement the Subscribe method the normal way (atleast the way I've seen it used), which is
public IDisposable Subscribe(IObserver<T> observer) {
// ..add to observer list..
return observer as IDisposable
}
More likely the correct answer would be:
return Disposable.Create(() => Observers.Remove(observer));
You should though note that the collction will probably be modified durin Completed-method, so create a copy of the list before processing them:
public void Completed()
{
foreach (var observer in Observers.ToList())
{
observer.OnCompleted();
}
}
I am not sure what your problem is, does this work for you (and what's missing here that you are doing?):
static void Main(string[] args)
{
var rows = new List<int> { 4,5,1,2,5 }.ToObservable();
rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
new { Row = row, Index = index })
.Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());
Console.ReadLine();
}
static void ProcessRow(int row, int index) {
Console.WriteLine("Row {0}, Index {1}", row, index);
}
static void Completed() {
}
精彩评论