How can I observe values in a non blocking way using Rx?
I'm trying to observe on a timer which its handler is longer then the interval. in order to do so I want to schedule the observation on some kind of threadPool, task pool or something.
I tried threadpool, taskpool, and newthread and none of them worked. Does anyone knows how to do it ?开发者_StackOverflow社区 example:
var disposable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100)).ObserveOn(Scheduler.NewThread).
Subscribe(x =>
{
count++;
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
});
Thread.Sleep(TimeSpan.FromSeconds(5));
disposable.Dispose();
if (count > 10 )
{
//hurray...
}
What you're asking is a bad idea to do, because you'll eventually exhaust available resources (since the rate of creating threads > the thread finishing rate). Instead, why don't you schedule a new item when the previous one is finished?
In your specific example, you need to pass an IScheduler to Observable.Timer instead of trying to use ObserveOn.
Paul is right when he says this is a bad idea. You are logically creating a situation where the queued up actions could blow system resources. You could even find it works on your computer, but fails on a customer's computer. The available memory, 32-/64-bit processor, etc, could all affect the code.
However, it's easy to modify you code to make it do what you want.
First up though, the Timer
method will correctly schedule timer events as long as the observer finishes before the next scheduled event. If the observer hasn't finished then the timer will wait. Remember that observable timers are "cold" observables, so for every subscribed observer there is effectively a new timer observable. It's a one-to-one relationship.
This behaviour prevents the timer from inadvertently blowing your resources.
So, as you code is currently defined, OnNext
is being called every 1000 milliseconds, not every 100.
Now, to allow the code to run at the 100 millisecond schedule, do this:
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100))
.Select(x => Scheduler.NewThread.Schedule(() =>
{
count++;
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
}))
.Subscribe(x => { });
Effectively this code is an IObservable<IDisposable>
where each disposable is the scheduled action that takes 1000 milliseconds to complete.
In my tests this ran nicely and incremented the count correctly.
I did try to blow my resources and found that setting the timer to run once every millisecond I quickly got a System.OutOfMemoryException
, but I found that the code ran if I changed the setting to every two milliseconds. This did, however, use up over 500 MB of RAM while the code ran and created around 500 new threads. Not nice at all.
Proceed with caution!
If you are genuinely, constantly producing values faster than you can consume them, then as pointed out, you are heading for trouble. If you can't slow down the rate of production, then you need to look at how to consume them faster. Perhaps you're wanting to multithread the observer to use multiple cores?
If you multithread the observer, you may need to be careful of processing events out of order. You'll be handling multiple notifications at the same time, and all bets are off as to which processing gets done first (or gets to some race condition critical state first).
If you don't have to process every event in the stream, take a look at the couple of different implementations of ObserveLatestOn
that are floating around. There are threads discussing it here and here.
ObserveLatestOn
will drop all but the latest notification that occurs while the observer is handling a previous notification. When the observer finishes handling the previous notification, it will then receive the latest notification and miss all the notifications that occurred in between.
The advantage of this is that it prevents the buildup of pressure from a producer that's faster than its consumer. If the consumer is slower because of load, then it's only going to get worse by handling more notifications. Dropping unneeded notifications could allow load to recede to the point where the consumer can keep up.
精彩评论