开发者

Limit Threads count

I have a List with items that I want to download. I use a for Loop to iterate the list.

For each item in this List I start a new Thread that references the item. My Problem is that I want limit the maxDownload at the same time.

for (int i = downloadList.Cou开发者_如何学JAVAnt - 1; i >= 0; i--)
{
    downloadItem item = downloadList[i];
    if (item.Status != 1 && item.Status != 2)
    {
        ThreadStart starter = delegate { this.DownloadItem(ref item); };
        Thread t = new Thread(starter);
        t.IsBackground = true;
        t.Name = item.Name;
        t.Priority = ThreadPriority.Normal;
        t.Start();
    }
}

I read something about the ThreadPool, but then I can't reference my item. Can someone Help me? Thanks! :)

Edit:

I tested this:

ThreadPool.SetMaxThreads(maxDownloads, maxDownloads);
ThreadPool.SetMinThreads(maxDownloads, maxDownloads);
ThreadPool.QueueUserWorkItem(DownloadItem, ref item);

I don't know how I can reference my downloadItem with this thread.....


if you're using .NET 4, I'd strongly suggest using Parallel.ForEach (potentially on downloadList.Reverse())

so, something like:

Parallel.ForEach(downloadList.Reverse(), 
                 new ParallelOptions { MaxDegreeOfParallelism = 8 },
                 item => this.DownloadItem(item));

If you don't want the calling thread to block, you could QueueUserWorkItem this call, of course.


I solved this very problem in .Net 3.5 by creating threads and loading them into a queue. Then I read a thread from the queue, start it, and increment the running thread count. I keep doing this until I hit the upper limit.

As each thread finishes it invokes a callback method that decrements the running count and signals the queue reader to start more threads. For additional control you can use a dictionary to keep track of running threads, keyed by ManagedThreadId, so you can signal threads to stop early or report progress.

Sample console app:

using System;
using System.Collections.Generic;
using System.Threading;

namespace ThreadTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supervisor = new Supervisor();
            supervisor.LaunchThreads();
            Console.ReadLine();
            supervisor.KillActiveThreads();
            Console.ReadLine();
        }

        public delegate void WorkerCallbackDelegate(int threadIdArg);
        public static object locker = new object();

        class Supervisor
        {
            Queue<Thread> pendingThreads = new Queue<Thread>();
            Dictionary<int, Worker> activeWorkers = new Dictionary<int, Worker>();

            public void LaunchThreads()
            {
                for (int i = 0; i < 20; i++)
                {
                    Worker worker = new Worker();
                    worker.DoneCallBack = new WorkerCallbackDelegate(WorkerCallback);
                    Thread thread = new Thread(worker.DoWork);
                    thread.IsBackground = true;
                    thread.Start();
                    lock (locker)
                    {
                        activeWorkers.Add(thread.ManagedThreadId, worker);
                    }
                }
            }

            public void KillActiveThreads()
            {
                lock (locker)
                {
                    foreach (Worker worker in activeWorkers.Values)
                    {
                        worker.StopWork();
                    }
                }
            }

            public void WorkerCallback(int threadIdArg)
            {
                lock (locker)
                {
                    activeWorkers.Remove(threadIdArg);
                    if (activeWorkers.Count == 0)
                    {
                        Console.WriteLine("no more active threads");
                    }
                }
            }
        }

        class Worker
        {
            public WorkerCallbackDelegate DoneCallBack { get; set; }
            volatile bool quitEarly;

            public void DoWork()
            {
                quitEarly = false;
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " started");
                DateTime startTime = DateTime.Now;
                while (!quitEarly && ((DateTime.Now - startTime).TotalSeconds < new Random().Next(1, 10)))
                {
                    Thread.Sleep(1000);
                }
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " stopped");
                DoneCallBack(Thread.CurrentThread.ManagedThreadId);
            }

            public void StopWork()
            {
                quitEarly = true;
            }
        }
    }
}


The best way to handle this is to only create maxDownloads number of threads. Put all of your work items into a queue and let the threads compete with each other to figure out which one processes each work item.

var queue = new ConcurrentQueue<downloadItem>(downloadList);
for (int i = 0; i < Math.Min(maxDownloads, queue.Count))
{
  var thread = new Thread(
    () =>
    {
      while (true)
      {
        downloadItem item = null;
        if (queue.TryDequeue(out item))
        {
          // Process the next work item.
          DownloadItem(item);
        }
        else
        {
          // No more work items are left.
          break;
        }
      }
    });
    thread.IsBackground = true;
    thread.Start();
}

You could also use a semaphore to throttle the number of threads processing work items. This is especially useful when the actual number of threads is unknown as would be the case if you were using the ThreadPool.

var semaphore = new Semaphore(maxDownloads, maxDownloads);
for (int i = 0; i < downloadList.Count; i++)
{
  downloadItem item = downloadList[i];
  ThreadPool.QueueUserWorkItem(
    (state) =>
    {
      semaphore.WaitOne();
      try
      {
        DownloadItem(item);
      }
      finally
      {
        semaphore.Release();
      }
    });
}

I am not particularly fond of either approach. The problem with the first is that a nonfixed amount of threads are created. It is generally advised to avoid creating threads in a for loop as that tends to not scale well. The problem with the second is that the semaphore will block some of the ThreadPool threads. That is not advised either because you are effecitvely claiming one of threads and then doing nothing on it. That might effect the performance of other unrelated tasks that happen to be sharing the ThreadPool. I think in this case either of the two options will be fine since crafting a more scalable pattern is more work than it is worth.


I can't see why you are trying to use the ref keyword anyway. Objects are passed by reference in C# by default, and in your original code you are not using item after it is passed to DownloadItem. Therefore I would suggest using the ThreadPool methods you tried, but not using a ref parameter.

Hope that helps.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜