MultiThreading: limit the concurrent threads
I need to develop an app that is using multithreading.
Basicly, I have a DataTable that contains around 200k rows. From each row, I need to take a field, compare it to a webpage, and then remove it from the datatable.
The thing is, the server serving those pages has a limit on concurrent requests. so at max I can ask for 3 pages at the same time.
I want to do this by using the threadpool, I even managed building a simple app that does that ( locks the datatable ) but I couldn't limit the concurrent threads ( even with SetMaxThreads ) it seems like it just ignored the limit.
does anyone have something ready made that does something similar ? I would love to see.
i have tried using semaphores, but got into problems:
static SemaphoreSlim _sem = new SemaphoreSlim(3); // Capacity of 3
static List<string> records = new List<string>();
static void Main()
{
records.Add("aaa");
records.Add("bbb");
records.Add("ccc");
records.Add("ddd");
records.Add("eee");
records.Add("fff");
records.Add("ggg");
records.Add("iii");
records.Add("jjj");
for (int i = 0; i < records.Count; i++ )
{
new Thread(ThreadJob).Start(records[i]);
}
Console.WriteLine(records.Count);
Console.ReadLine();
}
static void ThreadJob(object id)
{
Console.WriteLine(id + " wants to enter");
_sem.Wait();
Console.WriteLine(id + " is in!"); // Only three threads
//Thread.Sleep(1000 * (int)id); // can be here at
Console.WriteLine(id + " is leaving"); // a time.
lock (records)
{
records.Remove((string)id);
}
_sem.Release();
}
this runs quite nicely, the only problem is,
Console.WriteLine(records.count);
returns diffrent results. even due i understand that it happens since not all the threads have finished (开发者_运维百科 an i a m calling the records.count before all records have been removed) i couldnt find how to wait for all to finish.
To wait for multiple threads to finish, you can use multiple EventWaitHandle
's and then call WaitHandle.WaitAll
to block the main thread until all events are signalled:
// we need to keep a list of synchronization events
var finishEvents = new List<EventWaitHandle>();
for (int i = 0; i < records.Count; i++ )
{
// for each job, create an event and add it to the list
var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
finishEvents.Add(signal);
// we need to catch the id in a separate variable
// for the closure to work as expected
var id = records[i];
var thread = new Thread(() =>
{
// do the job
ThreadJob(id);
// signal the main thread
signal.Set();
});
}
WaitHandle.WaitAll(finishEvents.ToArray());
Since most of these threads would end up suspended most of the time, it would be better to use ThreadPool
in this case, so you can replace new Thread
with:
ThreadPool.QueueUserWorkItem(s =>
{
ThreadJob(id);
signal.Set();
});
When you are done with the events, don't forget to Dispose them:
foreach (var evt in finishEvents)
{
evt.Dispose();
}
[Edit]
To put it all in one place, here is what your example code should look like:
static Semaphore _sem = new Semaphore(3, 3); // Capacity of 3
static List<string> _records = new List<string>(new string[] { "aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh" });
static void Main()
{
var finishEvents = new List<EventWaitHandle>();
for (int i = 0; i < _records.Count; i++)
{
var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
finishEvents.Add(signal);
var id = _records[i];
var t = new Thread(() =>
{
ThreadJob(id);
signal.Set();
});
t.Start();
}
WaitHandle.WaitAll(finishEvents.ToArray());
Console.WriteLine(_records.Count);
Console.ReadLine();
}
static void ThreadJob(object id)
{
Console.WriteLine(id + " wants to enter");
_sem.WaitOne();
Console.WriteLine(id + " is in!");
Thread.Sleep(1000);
Console.WriteLine(id + " is leaving");
lock (_records)
{
_records.Remove((string)id);
}
_sem.Release();
}
(note that I've used Semaphore
instead of SemaphoreSlim
because I don't have .NET 4 on this machine and I wanted to test the code before updating the answer)
Why not use the Parallel Extensions - That would make things a lot easier.
Anyway, what you probably want to look at is something like Semaphores. I wrote a blog post on this subject a month or two back that you might find useful: https://colinmackay.scot/2011/03/30/using-semaphores-to-restrict-access-to-resources/
you can use
Semaphore if you are under .net 3.5
or
SemaphoreSlim in .net 4.0
First, should Console.WriteLine(id + " is leaving"); not be a bit later, after the lock and just before it releases the semaphore?
As to the actual waiting for all of the threads to finish, Groo's answer looks better and more robust in the long term, but as a quicker/simpler solution to this specific piece of code, I think you could also get away with just calling .Join() on all of the threads you want to wait for, in sequence.
static List<Thread> ThreadList = new List<Thread>(); // To keep track of them
then when starting the threads, replace the current new Thread line with:
ThreadList.Add(new Thread(ThreadJob).Start(records[i]));
and then just before the Console.WriteLine:
foreach( Thread t in ThreadList )
{
t.Join();
}
This will lock up if any of the threads don't terminate though, and if you ever want to know -which- threads haven't finished, this method won't work.
精彩评论