开发者

System.Threading.Tasks - Limit the number of concurrent Tasks

I have just started to look at the new "System.Threading.Tasks" goodness in .Net 4.0, and would like to know if there is any build in support for limiting the number of concurrent tasks that run at once, or if this should be manually handled.

E.G: If I need to call a calculation method 100 times, is there a way to set up 100 Tasks, but have only 5 execute simultaneously? The answer may just be to create 5 tasks, call Task.WaitAny, and create a new Task as each previous one finishes. I just want to make sure I am not missing a trick if there is a better way to do this.

Basically, is there a开发者_开发知识库 built in way to do this:

Dim taskArray() = {New Task(Function() DoComputation1()),
                   New Task(Function() DoComputation2()),
                   ...
                   New Task(Function() DoComputation100())}

Dim maxConcurrentThreads As Integer = 5
RunAllTasks(taskArray, maxConcurrentThreads)

Thanks for any help.


I know this is almost a year old, but I have found a much easier way to achieve this, so I thought I would share:

Dim actionsArray() As Action = 
     new Action(){
         New Action(Sub() DoComputation1()),
         New Action(Sub() DoComputation2()),
         ...
         New Action(Sub() DoComputation100())
      }

System.Threading.Tasks.Parallel.Invoke(New Tasks.ParallelOptions() With {.MaxDegreeOfParallelism = 5}, actionsArray)

Voila!


I know this is an old thread, but I just wanted to share my solution to this problem: use semaphores.

(This is in C#)

private void RunAllActions(IEnumerable<Action> actions, int maxConcurrency)
{
    using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        foreach(Action action in actions)
        {
            Task.Factory.StartNew(() =>
            {
                concurrencySemaphore.Wait();
                try
                {
                    action();
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });
        }
    }
}


A solution might be to take a look at the pre-made code from Microsoft here.

The description goes like this: "Provides a task scheduler that ensures a maximum concurrency level while running on top of the ThreadPool.", and as far as I've been able to test it seems to do the trick, in the same manner as the MaxDegreeOfParallelism property in ParallelOptions.


C# equivalent of sample provided by James

Action[] actionsArray = new Action[] {
new Action(() => DoComputation1()),
new Action(() => DoComputation2()),
    //...
new Action(() => DoComputation100())
  };

   System.Threading.Tasks.Parallel.Invoke(new Tasks.ParallelOptions {MaxDegreeOfParallelism =  5 }, actionsArray)


My blog post shows how to do this both with Tasks and with Actions, and provides a sample project you can download and run to see both in action.

With Actions

If using Actions, you can use the built-in .Net Parallel.Invoke function. Here we limit it to running at most 5 threads in parallel.

var listOfActions = new List<Action>();
for (int i = 0; i < 100; i++)
{
    // Note that we create the Action here, but do not start it.
    listOfActions.Add(() => DoSomething());
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 5};
Parallel.Invoke(options, listOfActions.ToArray());

With Tasks

Since you are using Tasks here though, there is no built-in function. However, you can use the one that I provide on my blog.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        StartAndWaitAllThrottled(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don&#39;t enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                throttler.Wait(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler&#39;s using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            Task.WaitAll(postTaskTasks.ToArray(), cancellationToken);
        }
    }

And then creating your list of Tasks and calling the function to have them run, with say a maximum of 5 simultaneous at a time, you could do this:

var listOfTasks = new List<Task>();
for (int i = 0; i < 100; i++)
{
    var count = i;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => Something()));
}
Tasks.StartAndWaitAllThrottled(listOfTasks, 5);


Short answer: If what you want is to limit the number of worker tasks so that they don't saturate your web service, then I think your approach is fine.

Long Answer: The new System.Threading.Tasks engine in .NET 4.0 runs on top of the .NET ThreadPool. Since there is only ever one ThreadPool per process and defaults to a maximum of 250 worker threads. Therefore, if you were to set the ThreadPool's maximum number of threads to a more modest number, you may be able to reduce the number of concurrently executing threads, and thus tasks using the ThreadPool.SetMaxThreads (...) API.

HOWEVER, note that you may well not be alone in utilizing the ThreadPool as many other classes that you utilize may also queue items to the ThreadPool. Therefore, there's a good chance that you may end up crippling the rest of your app by doing this. Also note that because the ThreadPool employs an algorithm to optimize its use of a given machine's underlying cores, limiting the number of threads the threadpool can queue to an arbitrarily low number can result in some pretty catastrophic performance issues.

Again, if you want to execute a small number of worker tasks/threads to exercise some task, then only creating a small number of tasks (vs. 100's) is the best approach.


It does not look like it, although you could create a subclass of TaskScheduler that implements such behavior.


If your program uses webservices number of simultaneous connections will be limited to ServicePointManager.DefaultConnectionLimit property. If you want 5 simultaneous connections it is not enough to use Arrow_Raider's solution. You also should increase ServicePointManager.DefaultConnectionLimit because it is only 2 by default.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜