Rx argumentException with webclient
I am working with the Reactive Extensions, to easy download webpages with the WebClient on Windows Phone. When I run the following code, I get an ArgumentExceptoin on the Subscribe call.
Parameter name: {0}
Parameter name: type
public IObservable<DownloadStringCompletedEventArgs> S开发者_运维百科tartRequest(string uri)
{
var _browser = new WebClient();
var obs = Observable.FromEvent<DownloadStringCompletedEventHandler,
DownloadStringCompletedEventArgs>(
h => _browser.DownloadStringCompleted += h,
h => _browser.DownloadStringCompleted -= h)
.Where(e => !e.Cancelled)
.Retry(3)
.SelectMany(
e => (e.Error == null && !string.IsNullOrEmpty(e.Result))
? Observable.Return(e)
: Observable.Throw<DownloadStringCompletedEventArgs>(e.Error))
.Take(1);
_browser.DownloadStringAsync(new Uri(uri));
return obs;
}
var obs = StartRequest("http://www.google.com");
obs.Subscribe(
x => Console.WriteLine(x.Result)
);
Your issue is that you're using the FromEvent extension rather than the FromEventPattern extension.
The former is for non-standard events and the latter for those that follow the "sender/eventargs" pattern.
That said, you still have a few more issues with your observable.
The instance of
WebClientis never disposed of. You should make sure that happens, but because you're using an observable you need to use theUsingextension method.Putting a
Wherebefore theTake(1)may mean that your observable will never end.The
Retryis also not going to do the right thing for you. If the "DownloadStringCompleted" observable has an error then retrying it will re-attach to the event, but the event will never return a new value because you're not callingDownloadStringAsyncagain.
What you want to do with this kind of observable is make it re-execute the web call every time a new observer subscribes, but, when a subscription is shared (if the observable is published, for example) then you don't want the web call to re-execute. You kind of want to have your cake and eat it too.
Here's how to do it:
public IObservable<string> CreateDownloadStringObservable(string uri)
{
return Observable.Create<string>(o =>
{
var result = new ReplaySubject<string>();
var inner = Observable.Using(() => new WebClient(), wc =>
{
var obs = Observable
.FromEventPattern<
DownloadStringCompletedEventHandler,
DownloadStringCompletedEventArgs>(
h => wc.DownloadStringCompleted += h,
h => wc.DownloadStringCompleted -= h)
.Take(1);
wc.DownloadStringAsync(new Uri(uri));
return obs;
}).Subscribe(ep =>
{
if (ep.EventArgs.Cancelled)
{
result.OnCompleted();
}
else
{
if (ep.EventArgs.Error != null)
{
result.OnError(ep.EventArgs.Error);
}
else
{
result.OnNext(ep.EventArgs.Result);
result.OnCompleted();
}
}
}, ex =>
{
result.OnError(ex);
});
return new CompositeDisposable(inner, result.Subscribe(o));
});
}
Now you can call it like this:
IObservable<string> obs =
CreateDownloadStringObservable("http://www.google.com")
.Retry(3);
Notice that the Retry is now outside of the request method where it should belong (unless you add a retry argument to the method call).
You can't use Retry with FromEvent, because FromEvent is a hot observable - you'll just get the same result 3 times. The easiest way to fix your existing code is via Observable.Defer:
string uri = "http://www.whatever.com";
var webClient = Observable.Defer(() => {
var browser = new WebClient();
var ret = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
h => _browser.DownloadStringCompleted += h,
h => _browser.DownloadStringCompleted -= h);
browser.DownloadStringAsync(new Uri(uri));
return ret;
});
webClient
.Timeout(TimeSpan.FromSeconds(15))
.Retry(3);
Defer means that instead of WebClient being created immediately, it will be created for everyone who Subscribe()'s to the Observable, which is what is required for Retry.
加载中,请稍侯......
精彩评论