Rx argumentException with webclient
I am working with the Reactive Extensions, to easy download webpages with the WebClient on Windows Phone. When I run the following code, I get an ArgumentExceptoin on the Subscribe call.
Parameter name: {0}
Parameter name: type
public IObservable<DownloadStringCompletedEventArgs> S开发者_运维百科tartRequest(string uri)
{
var _browser = new WebClient();
var obs = Observable.FromEvent<DownloadStringCompletedEventHandler,
DownloadStringCompletedEventArgs>(
h => _browser.DownloadStringCompleted += h,
h => _browser.DownloadStringCompleted -= h)
.Where(e => !e.Cancelled)
.Retry(3)
.SelectMany(
e => (e.Error == null && !string.IsNullOrEmpty(e.Result))
? Observable.Return(e)
: Observable.Throw<DownloadStringCompletedEventArgs>(e.Error))
.Take(1);
_browser.DownloadStringAsync(new Uri(uri));
return obs;
}
var obs = StartRequest("http://www.google.com");
obs.Subscribe(
x => Console.WriteLine(x.Result)
);
Your issue is that you're using the FromEvent
extension rather than the FromEventPattern
extension.
The former is for non-standard events and the latter for those that follow the "sender/eventargs" pattern.
That said, you still have a few more issues with your observable.
The instance of
WebClient
is never disposed of. You should make sure that happens, but because you're using an observable you need to use theUsing
extension method.Putting a
Where
before theTake(1)
may mean that your observable will never end.The
Retry
is also not going to do the right thing for you. If the "DownloadStringCompleted" observable has an error then retrying it will re-attach to the event, but the event will never return a new value because you're not callingDownloadStringAsync
again.
What you want to do with this kind of observable is make it re-execute the web call every time a new observer subscribes, but, when a subscription is shared (if the observable is published, for example) then you don't want the web call to re-execute. You kind of want to have your cake and eat it too.
Here's how to do it:
public IObservable<string> CreateDownloadStringObservable(string uri)
{
return Observable.Create<string>(o =>
{
var result = new ReplaySubject<string>();
var inner = Observable.Using(() => new WebClient(), wc =>
{
var obs = Observable
.FromEventPattern<
DownloadStringCompletedEventHandler,
DownloadStringCompletedEventArgs>(
h => wc.DownloadStringCompleted += h,
h => wc.DownloadStringCompleted -= h)
.Take(1);
wc.DownloadStringAsync(new Uri(uri));
return obs;
}).Subscribe(ep =>
{
if (ep.EventArgs.Cancelled)
{
result.OnCompleted();
}
else
{
if (ep.EventArgs.Error != null)
{
result.OnError(ep.EventArgs.Error);
}
else
{
result.OnNext(ep.EventArgs.Result);
result.OnCompleted();
}
}
}, ex =>
{
result.OnError(ex);
});
return new CompositeDisposable(inner, result.Subscribe(o));
});
}
Now you can call it like this:
IObservable<string> obs =
CreateDownloadStringObservable("http://www.google.com")
.Retry(3);
Notice that the Retry
is now outside of the request method where it should belong (unless you add a retry
argument to the method call).
You can't use Retry with FromEvent, because FromEvent is a hot observable - you'll just get the same result 3 times. The easiest way to fix your existing code is via Observable.Defer:
string uri = "http://www.whatever.com";
var webClient = Observable.Defer(() => {
var browser = new WebClient();
var ret = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
h => _browser.DownloadStringCompleted += h,
h => _browser.DownloadStringCompleted -= h);
browser.DownloadStringAsync(new Uri(uri));
return ret;
});
webClient
.Timeout(TimeSpan.FromSeconds(15))
.Retry(3);
Defer means that instead of WebClient being created immediately, it will be created for everyone who Subscribe()'s to the Observable, which is what is required for Retry.
精彩评论