开发者

Reactive Extensions clean up

If you have a long chain of calls using rx such as:

var responses = collectionOfHttpRequests.ToObservable()
.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse)
.Select(res => res.GetResponseBodyString()) // Extension method to get the body of the request
.Subscribe();

and then before th开发者_开发知识库e operation completes you call a dispose, will the http requests be cancelled, closed, and disposed of properly or do I have to somehow select the httprequests from the method chains and dispose of them individually?

I have a thing where one can have several http requests occurring at once and I need to be able to cancel (not ignore) some/all of them to save network traffic.


The Rx operator chain will clean itself up when the sequence completes, errors or the subscription is disposed. However, each operator will only cleanup what they are expected to cleanup. For example, FromEvent will unsubscribe from the event.

In your case, cancellation is not supported by the Begin/End asynchronous pattern, so there is nothing for Rx to cancel. You can, however, use Finally to call HttpWebRequest.Abort.

var observableRequests = collectionOfHttpRequests.ToObservable();

var responses = observableRequests
    .SelectMany(req => 
        Observable.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse)()
    )
    .Select(resp => resp.GetResponseBodyString())
    .Finally(() =>
    {
        observableRequests
            .Subscribe(req => req.Abort());
    })
    .Subscribe();


I can't admit Richard Szalay's solution as acceptable. If you start 100 requests and second request failed with server unavailable error (for example) remaining 98 requests will be aborted. The second problem is how UI will react on such observable? Too sad.

Keeping in mind chapter 4.3 of Rx Design Guidelines I desired to express WebRequest observable via Observable.Using() operator. But WebRequest is not disposable! So I defined DisposableWebRequest:

public class DisposableWebRequest : WebRequest, IDisposable
{
    private static int _Counter = 0;

    private readonly WebRequest _request;
    private readonly int _index;

    private volatile bool _disposed = false;

    public DisposableWebRequest (string url)
    {
        this._request = WebRequest.Create(url);
        this._index = ++DisposableWebRequest._Counter;
    }

    public override IAsyncResult BeginGetResponse(AsyncCallback callback, object state)
    {
        return this._request.BeginGetResponse(callback, state);
    }

    public override WebResponse EndGetResponse(IAsyncResult asyncResult)
    {
        Trace.WriteLine(string.Format("EndGetResponse index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId));
        Trace.Flush();
        if (!this._disposed)
        {
            return this._request.EndGetResponse(asyncResult);
        }
        else
        {
            return null;
        }
    }

    public override WebResponse GetResponse()
    {
        return this._request.GetResponse();
    }

    public override void Abort()
    {
        this._request.Abort();
    }

    public void Dispose()
    {
        if(!this._disposed)
        {
            this._disposed = true;

            Trace.WriteLine(string.Format("Disposed index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId ));
            Trace.Flush();
            this.Abort();
        }
    }
}

Then I create WPF window and put two buttons on it (Start and Stop).

So, let's create proper requests observable collection. At first, define URL's observable create function:

        Func<IObservable<string>> createUrlObservable = () =>
            Observable
                .Return("http://yahoo.com")
                .Repeat(100)
                .OnStartup(() =>
                {
                    this._failed = 0;
                    this._successed = 0;
                });

On every url we should create webrequest obervable, so:

        Func<string, IObservable<WebResponse>> createRequestObservable = 
            url => 
            Observable.Using(
                () => new DisposableWebRequest("http://yahoo.com"),
                r =>
                {
                    Trace.WriteLine("Queued " + url);
                    Trace.Flush();
                    return Observable
                        .FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)();
                });

In addition define two event observables which reacts on buttons "Start"/"Stop" clicks:

        var startMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StartButton, "Click");
        var stopMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StopButton, "Click");

So bricks are ready, time to compose them (I do it in view constructor just after InitializeComponent()):

        startMouseDown
            .SelectMany(down =>
                createUrlObservable()
                    .SelectMany(url => createRequestObservable(url)
                        .TakeUntil(stopMouseDown)
                        .Select(r => r.GetResponseStream())
                        .Do(s =>
                            {
                                using (var sr = new StreamReader(s))
                                {
                                    Trace.WriteLine(sr.ReadLine());
                                    Trace.Flush();
                                }

                            })
                        .Do(r => this._successed++)
                        .HandleError(e => this._failed++))
                        .ObserveOnDispatcher()
                        .Do(_ => this.RefresLabels(),
                            e => { },
                            () => this.RefresLabels())

                        )
            .Subscribe();

You may wonder on function "HandleError()". If exception occured in EndGetResponse() call (I turned off network connection to reproduce it) and not catched in request observable - it will crash the startMouseDown observable. HandleError catches exception silently, perfom provided action and instead of call OnError for next observer it calls OnCompleted:

public static class ObservableExtensions
{
    public static IObservable<TSource> HandleError<TSource>(this IObservable<TSource> source, Action<Exception> errorHandler)
    {
        return Observable.CreateWithDisposable<TSource>(observer =>
            {
                return source.Subscribe(observer.OnNext, 
                    e => 
                    { 
                        errorHandler(e);
                        //observer.OnError(e);
                        observer.OnCompleted();
                    },
                    observer.OnCompleted);
            });
    }
}

The last unexplained place is method RefreshLabels, which updates UI controls:

    private void RefresLabels()
    {
        this.SuccessedLabel.Content = string.Format("Successed {0}", this._successed);
        this.FailedLabel.Content = string.Format("Failed {0}", this._failed);
    }
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜