Window Operator does not work with itself?
I have a bit of code that I would expect to work in a way, and it doesn't, I am wondering what I am doing wrong :
class Program
{
static void Main(string[] args)
{
var ints = Observable.Interval(TimeSpan.FromMilliseconds(1000));
var windowed = ints.Window(() => ints.Select(i => i / 3).DistinctUntilChanged());
windowed.Subscribe(HandleNewWindow);
Console.ReadLine();
开发者_JAVA技巧 }
public static void HandleNewWindow(IObservable<long> ints)
{
Console.WriteLine("New sequence received");
ints.Subscribe(Console.WriteLine);
}
}
Output for this should be :
New sequence received
0 1 2 New sequence received 3 4 5 New sequence received 6 7 8 ...but it is :
New sequence received
0 New sequence received 1 New sequence received 2 New sequence received 3 New sequence received 4 New sequence received 5 New sequence received 6 ...Note if I use a different line to define my window, such as :
var windowed = ints.Window(() => Observable.Interval(TimeSpan.FromMilliseconds(3000)));
then it all works fine.
Does Window have a problem with using window closings that are derived from the Observable it is windowing, or am I missing something important here ?
You need to use the Publish
operator to create an observable who's subscriptions to the source can be shared. It looks like every time the window is closed it internally sets up a new subscription to the source. Using publish ensures you are not starting a new interval every time
You also need to change your window close selector to only fire when you want the window to be closed.
class Program
{
static void Main(string[] args)
{
var ints = Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Publish(new Subject<long>());
var closeOnValues = ints.Where(ShouldClose);
var windowed = ints.Window(() => closeOnValues);
windowed.Subscribe(HandleNewWindow);
Console.ReadLine();
}
public static void HandleNewWindow(IObservable<long> ints)
{
Console.WriteLine("New sequence received");
ints.Subscribe(Console.WriteLine);
}
public static bool ShouldClose(long index)
{
var notZero = index != 0;
var countIsMultipleOfThree = (index + 1) % 3 == 0;
return notZero && countIsMultipleOfThree;
}
}
I have something that looks more like my original code and produces the expected values. I still don't understand why this code works and not the other one, but I think James Hay nailed it when he said some kind of re-subscription happens behind the scenes.
class Program
{
static void Main(string[] args)
{
var ints = Observable.Interval(TimeSpan.FromMilliseconds(1000));
var windowClosings = ints
.Select(i => i / 3)
.DistinctUntilChanged()
.SkipWhile((i) => i == 0)
.Publish(new Subject<long>());
var windowed = ints.Window(() => windowClosings);
windowed.Subscribe(HandleNewWindow);
Console.ReadLine();
}
public static void HandleNewWindow(IObservable<long> ints)
{
Console.WriteLine("New sequence received");
ints.Subscribe(Console.WriteLine);
}
}
Main differences here, apart from the SkipWhile which only removes the first windowClosing, is that I had to publish the windowClosings (and not the original Observable).
Still not 100% sure why I had to do that.
精彩评论