C# Async WebRequests: Perform Action When All Requests Are Completed
I have this basic scraping console application in C# that Asynchronously uses WebRequest to get html from a list of sites. It works fine, but how do I set up a trigger that goes off when every site in the list has been processed?
I've spent a couple hours researching various solutions online, including the MS docs, but none of them provide a straight forward answer via code. I've read about the IAsyncResult.AsyncWaitHandle but I have no clue how to integrate it into my code. I'd just like to call a custom function when all threads complete processing or timeout.
One trick is that I never know ahead of time how many sites are in my list (it's user defined), so I need a solution that's robust enough to wait for 5 events for 100,000 events to complete.
Thanks. Working code below:
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
开发者_运维百科using System.Net;
using System.Threading;
namespace AsyncApp_01
{
class Program
{
static void Main(string[] args)
{
ArrayList alSites = new ArrayList();
alSites.Add("http://www.google.com");
alSites.Add("http://www.lostspires.com");
ScanSites(alSites);
Console.Read();
}
private static void ScanSites(ArrayList sites)
{
foreach (string uriString in sites)
{
WebRequest request = HttpWebRequest.Create(uriString);
request.Method = "GET";
object data = new object(); //container for our "Stuff"
// RequestState is a custom class to pass info to the callback
RequestState state = new RequestState(request, data, uriString);
IAsyncResult result = request.BeginGetResponse(new AsyncCallback(UpdateItem), state);
//Register the timeout callback
ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, new WaitOrTimerCallback(ScanTimeoutCallback), state, (30 * 1000), true);
}
}
private static void UpdateItem(IAsyncResult result)
{
// grab the custom state object
RequestState state = (RequestState)result.AsyncState;
WebRequest request = (WebRequest)state.Request;
// get the Response
HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result);
Stream s = (Stream)response.GetResponseStream();
StreamReader readStream = new StreamReader(s);
// dataString will hold the entire contents of the requested page if we need it.
string dataString = readStream.ReadToEnd();
response.Close();
s.Close();
readStream.Close();
Console.WriteLine(dataString);
}
private static void ScanTimeoutCallback(object state, bool timedOut)
{
if (timedOut)
{
RequestState reqState = (RequestState)state;
if (reqState != null)
{
reqState.Request.Abort();
}
Console.WriteLine("aborted- timeout");
}
}
class RequestState
{
public WebRequest Request; // holds the request
public object Data; // store any data in this
public string SiteUrl; // holds the UrlString to match up results (Database lookup, etc).
public RequestState(WebRequest request, object data, string siteUrl)
{
this.Request = request;
this.Data = data;
this.SiteUrl = siteUrl;
}
}
}
}
Bonus points for anyone who can also tell me how to limit the number of concurrent threads. For example, if I have 100 sites to process, how do I set it up so that 10 sites get processed at a time, but not more. I don't want to open 100 threads.
Here's a quick sample I threw together. I removed the WebClient implementation, as it seems like you're using the WebRequest one. I'm also making use of .Net 4's ConcurrentBag:
public class Scraper
{
private readonly IEnumerable<string> _sites;
private readonly ConcurrentBag<string> _data;
private volatile int _count;
private readonly int _total;
public Scraper(IEnumerable<string> sites)
{
_sites = sites;
_data = new ConcurrentBag<string>();
_total = sites.Count();
}
public void Start()
{
foreach (var site in _sites)
{
ScrapeSite(site);
}
}
private void ScrapeSite(string site)
{
var req = WebRequest.Create(site);
req.BeginGetResponse(AsyncCallback, req);
}
private void AsyncCallback(IAsyncResult ar)
{
Interlocked.Increment(ref _count);
var req = ar.AsyncState as WebRequest;
var result = req.EndGetResponse(ar);
var reader = new StreamReader(result.GetResponseStream());
var data = reader.ReadToEnd();
this.OnSiteScraped(req.RequestUri.AbsoluteUri, data);
_data.Add(data);
if (_count == _total)
{
OnScrapingComplete();
}
}
private void OnSiteScraped(string site, string data)
{
var handler = this.SiteScraped;
if (handler != null)
{
handler(this, new SiteScrapedEventArgs(site, data));
}
}
private void OnScrapingComplete()
{
var handler = this.ScrapingComplete;
if (handler != null)
{
handler(this, new ScrapingCompletedEventArgs(_data));
}
}
public event EventHandler<SiteScrapedEventArgs> SiteScraped;
public event EventHandler<ScrapingCompletedEventArgs> ScrapingComplete;
}
public class SiteScrapedEventArgs : EventArgs
{
public string Site { get; private set; }
public string Data { get; private set; }
public SiteScrapedEventArgs(string site, string data)
{
this.Site = site;
this.Data = data;
}
}
OK, I created some basic classes, and this should do the trick. If this isn't enough, I'm sorry, I simply can't help you:
public class RankedPage
{
public int Rank { get; set; }
public string Site { get; set; }
}
public class WebRequestData
{
public WebRequest WebRequest { get; set; }
public RankedPage Page { get; set; }
}
public class Scraper
{
private readonly IEnumerable<RankedPage> _sites;
private readonly ConcurrentBag<KeyValuePair<RankedPage,string>> _data;
private volatile int _count;
private readonly int _total;
public Scraper(IEnumerable<RankedPage> sites)
{
_sites = sites;
_data = new ConcurrentBag<KeyValuePair<RankedPage, string>>();
_total = sites.Count();
}
public void Start()
{
foreach (var site in _sites)
{
ScrapeSite(site);
}
}
private void ScrapeSite(RankedPage site)
{
var req = WebRequest.Create(site.Site);
req.BeginGetResponse(AsyncCallback, new WebRequestData{ Page = site, WebRequest = req});
}
private void AsyncCallback(IAsyncResult ar)
{
Interlocked.Increment(ref _count);
var webRequestData = ar.AsyncState as WebRequestData;
var req = webRequestData.WebRequest;
var result = req.EndGetResponse(ar);
var reader = new StreamReader(result.GetResponseStream());
var data = reader.ReadToEnd();
this.OnSiteScraped(webRequestData.Page, data);
_data.Add(new KeyValuePair<RankedPage, string>(webRequestData.Page,data));
if (_count == _total)
{
OnScrapingComplete();
}
}
private void OnSiteScraped(RankedPage page, string data)
{
var handler = this.SiteScraped;
if (handler != null)
{
handler(this, new SiteScrapedEventArgs(page, data));
}
}
private void OnScrapingComplete()
{
var handler = this.ScrapingComplete;
if (handler != null)
{
handler(this, new ScrapingCompletedEventArgs(_data));
}
}
public event EventHandler<SiteScrapedEventArgs> SiteScraped;
public event EventHandler<ScrapingCompletedEventArgs> ScrapingComplete;
}
public class SiteScrapedEventArgs : EventArgs
{
public RankedPage Site { get; private set; }
public string Data { get; private set; }
public SiteScrapedEventArgs(RankedPage site, string data)
{
this.Site = site;
this.Data = data;
}
}
public class ScrapingCompletedEventArgs : EventArgs
{
public IEnumerable<KeyValuePair<RankedPage,string >> SiteData { get; private set; }
public ScrapingCompletedEventArgs(IEnumerable<KeyValuePair<RankedPage, string>> siteData)
{
this.SiteData = siteData;
}
}
精彩评论