开发者

Why does Rx Observable.Subscribe block my thread?

Hello there' I've tried out one of the 101 Rx examples:

    static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
    {
        int i = 0;

        while (true)
        {
            if (i > 1000)
            {
                yield break;
            }
            yield return i;
            Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
        }
    }

    private static void Main()
    {
        var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
        var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));

        using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }

I don't understand why the line "Press any key to unsubscribe"开发者_如何转开发 never shows. My understanding was subscribing is asynchronous, you subscribe and it immedietly returns. What am I missing that causes my main thread to block?


The blocking is caused by a combination of your enumerable looping on while (true) and IEnumerable<T>.ToObservable() extension methods defaulting to CurrentThreadScheduler.

If you supply Scheduler.TaskPool (or Scheduler.ThreadPool in pre-.NET 4) to an overload of ToObservable, you should see the behavior you're expecting (though it won't call your subscriber on the main thread, FYI).

Having said that, I think you'll find your combination of Thread.Sleep and Throttle will work as you expect. You're probably better off creating a custom observable that uses a scheduler to schedule your delays.


I agree with Richard.

The implementation for .ToObservable() looks like this:

public static IObservable<TSource> ToObservable<TSource>(
    this IEnumerable<TSource> source)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    return source.ToObservable<TSource>(Scheduler.CurrentThread);
}

It's calling the .ToObservable(IScheduler) overload with Scheduler.CurrentThread and since you're using .Sleep(...) to cause the delays the observable has to complete before the code can go beyond the .Subscribe(...) method. Just think in terms of what would this code behave like if it all ran on a single thread (which it is.)

To get around this you can use the task pool or thread pool schedulers as Richard suggests, but I think you have a more fundamental issue with your code. And that is that you're using "old school" thread sleeping and not relying on Rx methods instead.

Try this to generate your observable:

var observable =
    Observable
        .GenerateWithTime(0, i => i <= 1000, i => i + 1,
            i => i, i => TimeSpan.FromMilliseconds(i % 10 < 5 ? 500 : 1000))
        .Timestamp();

GenerateWithTime(...) does everything that your GenerateAlternatingFastAndSlowEvents method did, but it creates the observable directly and does it using the Scheduler.ThreadPool under the hood so you don't need to specify any schedulers.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜