Ensuring task execution order in ThreadPool
I have been reading about the thread-pool pattern and I can't seem to find the usual solution for the following problem.
I sometimes want tasks to be executed serially. For example, I read chunks of text from a file and for some reason I need the chunks to be processed in that order. So basically I want to eliminate concurrency for some of the tasks.
Consider this scenario where the tasks with *
need to be processed in the order they were pushed in. The other tasks can be processed in any order.
push task1
push task2
push task3 *
push task4 *
push task5
push task6 *
....
and so on
In the context of a thread-pool, without this constraint, a single queue of pending tasks works fine but clearly here it doesn't.
I thought about having some of the threads operate on a thread-specific queue and the o开发者_StackOverflow中文版thers on the "global" queue. Then, in order to execute some of the tasks serially, I simply have to push them onto a queue where a single thread looks. It does sounds a bit clumsy.
So, the real question in this long story: how would you solve this ? How would you ensure those tasks are ordered?
EDIT
As a more general problem, suppose the scenario above becomes
push task1
push task2 **
push task3 *
push task4 *
push task5
push task6 *
push task7 **
push task8 *
push task9
....
and so on
What I mean is that the tasks within a group should be executed sequentially, but the groups themselves can mix. So you can have 3-2-5-4-7
for example.
One other thing to note is that I don't have access to all the tasks in a group upfront (and I can't wait for all of them to arrive before starting the group).
Something like the following will allow serial and parallel tasks to be queued, where serial tasks will be executed one after the other, and parallel tasks will be executed in any order, but in parallel. This gives you the ability to serialize tasks where necessary, also have parallel tasks, but do this as tasks are received i.e. you do not need to know about the entire sequence up-front, execution order is maintained dynamically.
internal class TaskQueue
{
private readonly object _syncObj = new object();
private readonly Queue<QTask> _tasks = new Queue<QTask>();
private int _runningTaskCount;
public void Queue(bool isParallel, Action task)
{
lock (_syncObj)
{
_tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
}
ProcessTaskQueue();
}
public int Count
{
get{lock (_syncObj){return _tasks.Count;}}
}
private void ProcessTaskQueue()
{
lock (_syncObj)
{
if (_runningTaskCount != 0) return;
while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
{
QTask parallelTask = _tasks.Dequeue();
QueueUserWorkItem(parallelTask);
}
if (_tasks.Count > 0 && _runningTaskCount == 0)
{
QTask serialTask = _tasks.Dequeue();
QueueUserWorkItem(serialTask);
}
}
}
private void QueueUserWorkItem(QTask qTask)
{
Action completionTask = () =>
{
qTask.Task();
OnTaskCompleted();
};
_runningTaskCount++;
ThreadPool.QueueUserWorkItem(_ => completionTask());
}
private void OnTaskCompleted()
{
lock (_syncObj)
{
if (--_runningTaskCount == 0)
{
ProcessTaskQueue();
}
}
}
private class QTask
{
public Action Task { get; set; }
public bool IsParallel { get; set; }
}
}
Update
To handle task groups with serial and parallel task mixes, a GroupedTaskQueue
can manage a TaskQueue
for each group. Again, you do not need to know about groups up-front, it is all dynamically managed as tasks are received.
internal class GroupedTaskQueue
{
private readonly object _syncObj = new object();
private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
private readonly string _defaultGroup = Guid.NewGuid().ToString();
public void Queue(bool isParallel, Action task)
{
Queue(_defaultGroup, isParallel, task);
}
public void Queue(string group, bool isParallel, Action task)
{
TaskQueue queue;
lock (_syncObj)
{
if (!_queues.TryGetValue(group, out queue))
{
queue = new TaskQueue();
_queues.Add(group, queue);
}
}
Action completionTask = () =>
{
task();
OnTaskCompleted(group, queue);
};
queue.Queue(isParallel, completionTask);
}
private void OnTaskCompleted(string group, TaskQueue queue)
{
lock (_syncObj)
{
if (queue.Count == 0)
{
_queues.Remove(group);
}
}
}
}
Thread pools are good for cases where the relative order of the tasks doesn't matter, provided they all get done. In particular, it must be OK for them all to be done in parallel.
If your tasks must be done in a specific order, then they are not suitable for parallelism, so a thread pool is not appropriate.
If you want to move these serial tasks off the main thread, then a single background thread with a task queue would be appropriate for those tasks. You can continue to use a thread pool for the remaining tasks which are suitable for parallelism.
Yes, it means you have to decide where to submit the task depending on whether it is an in-order task or a "may be parallelized" task, but this is not a big deal.
If you have groups that must be serialized, but which can run in parallel with other tasks then you have multiple choices:
- Create a single task for each group, which does the relevant group tasks in order, and post this task to the thread pool.
- Have each task in a group explicitly wait for the previous task in the group, and post them to the thread pool. This requires that your thread pool can handle the case where a thread is waiting for a not-yet-scheduled task without deadlocking.
- Have a dedicated thread for each group, and post group tasks on the appropriate message queue.
Basically, there are a number of pending tasks. Some of the tasks can only be performed when one or more other pending tasks have finished executing.
The pending tasks can be modeled in a dependency graph:
- "task 1 -> task2" means "task 2 can be executed only after task 1 is finished." the arrows point in the direction of execution order.
- the indegree of a task (the number of tasks pointing to it) determines whether the task is ready for execution. If the indegree is 0, it can be executed.
- sometimes a task must wait for multiple tasks to finish, the indegree is then >1.
- if a task doesn't have to wait for other tasks to finish anymore (its indegree is zero), it can be submitted to the thread pool with worker threads, or the queue with tasks waiting to be picked up by a worker thread. You know the submitted task will not cause deadlock, because the task isn't waiting for anything. As an optimization, you can use a priority queue, e.g. in which tasks that more tasks in the dependency graph depend on will be executed first. This also can't provoke deadlock, because all tasks in the thread pool can be executed. It can provoke starvation, however.
- If a task finishes execution, it can be removed from the dependency graph, possibly reducing the indegree of other tasks, which can in turn be submitted to the pool of working threads.
So there is (at least) one thread used to add/remove pending tasks, and there is a thread pool of working threads.
When a task is added to the dependency graph, you must check:
- how the task is connected in the dependency graph: what tasks must it wait for to finish and what tasks must wait for it to finish? Draw connections from and to the new task accordingly.
- once the connections are drawn: did the new connections cause any cycles in the dependency graph? If so, there is a deadlock situation.
Performance:
- this pattern is slower than sequential execution if parallel execution is in fact rarely possible, because you need extra administration to do everything almost sequentially anyway.
- this pattern is fast if many tasks can be performed simultaneously in practice.
Assumptions:
As you may have read between the lines, you must design the tasks so that they don't interfere with other tasks. Also, there must be a way to determine the priority of the tasks. The task priority should include the data handled by each task. Two tasks may not alter the same object simultaneously; one of the tasks should get priority over the other one instead, or the performed operations on the object must be thread-safe.
To do what you want to do with a threadpool, you might have to create some kind of scheduler.
Something like that:
TaskQueue -> Scheduler -> Queue -> ThreadPool
Scheduler runs in its own thread, keeping tracks of dependencies between jobs. When a job is ready to be done, the scheduler just pushes it in the queue for the threadpool.
The ThreadPool might have to send signals to the Scheduler to indicate when a job is done so the scheduler can put jobs depending on that job into the Queue.
In your case, the dependencies could probably be stored in a linked list.
Let's say you have the following dependencies: 3 -> 4 -> 6 -> 8
Job 3 is running on the threadpool, you still have no ideas that job 8 exists.
Job 3 ends. You remove the 3 from the linked list, you put job 4 on the queue to the threadpool.
Job 8 arrives. You put it at the end of the linked list.
The only constructs that have to be fully synchronized are the Queues before and after the scheduler.
If I understand the problem correctly, the jdk executors don't have this capability but it's easy to roll your own. You basically need
- a pool of worker threads, each of which has a dedicated queue
- some abstraction over those queues to which you offer work (c.f. the
ExecutorService
) - some algorithm that deterministically selects a specific queue for each piece of work
- each piece of work then gets offers to the right queue and hence gets processed in the right order
The difference to the jdk executors is that they have 1 queue with n threads but you want n queues and m threads (where n may or may not equal m)
* edit after reading that each task has a key *
In a bit more detail
- write some code that transforms a key into an index (an int) in a given range (0-n where n is the number of threads you want), this could be as simple as
key.hashCode() % n
or it could be some static mapping of known key values to threads or whatever you want - at startup
- create n queues, put them in an indexed structure (array, list whatever)
- start n threads, each thread just does a blocking take from the queue
- when it receives some work, it knows how to execute work specific to that task/event (you can obviously have some mapping of tasks to actions if you have heterogenous events)
- store this behind some facade that accepts the work items
- when a task arrives, hand it to the facade
- the facade finds the right queue for the task based on the key, offers it to that queue
it's easier enough to add auto restarting worker threads to this scheme, you just then need the worker thread to register with some manager to state "I own this queue" and then some housekeeping around that + detection of errors in the thread (which means it unregisters the ownership of that queue returning the queue to a free pool of queues which is a trigger to start a new thread up)
I think thread pool can be effectively used in this situation. The idea is to use separate strand
object for each group of dependent tasks. You add tasks to your queue with or w/o strand
object. You use the same strand
object with dependent tasks. Your scheduler checks if the next task has a strand
and if this strand
is locked. If not - lock this strand
and run this task. If strand
is already locked - keep this task in queue until next scheduling event. When task is done unlock its strand
.
In result you need single queue, you don't need any additional threads, no complicated groups etc. strand
object can be very simple with two methods lock
and unlock
.
I often meet the same design problem, e.g. for an asynchronous network server that handles multiple simultaneous sessions. Sessions are independent (this maps them to your independent tasks and groups of dependent tasks) when tasks inside sessions are dependent (this maps session internal tasks to your dependent tasks inside a group). Using described approach I avoid explicit synchronization inside session completely. Every session has own strand
object.
And what is more, I use existing (great) implementation of this idea: Boost Asio library (C++). I just used their term strand
. Implementation is elegant: I wrap my async tasks into corresponding strand
object before scheduling them.
Option 1 - The complex one
Since you have sequential jobs, you can gather up those jobs in a chain and let the jobs themselves resubmit to the thread pool once they are done. Suppose we have a list of jobs:
[Task1, ..., Task6]
like in your example. We have a sequential dependency, such that [Task3, Task4, Task6]
is a dependency chain. We now make a job (Erlang pseudo-code):
Task4Job = fun() ->
Task4(), % Exec the Task4 job
push_job(Task6Job)
end.
Task3Job = fun() ->
Task3(), % Execute the Task3 Job
push_job(Task4Job)
end.
push_job(Task3Job).
That is, we alter the Task3
job by wrapping it into a job which as a continuation pushes the next job in the queue to the thread pool. There are strong similarities to a general continuation passing style here also seen in systems like Node.js
or Pythons Twisted
framework.
Generalizing, you make a system where you can define job chains which can defer
further work and resubmit the further work.
Option 2 - The simple one
Why do we even bother splitting up the jobs? I mean, since they are sequentially dependent, executing all of them on the same Thread won't be faster or slower than taking that chain and spreading it out over multiple threads. Assuming "enough" work load, any thread will always have work to anyway, so just bundling the jobs together is probably easiest:
Task = fun() ->
Task3(),
Task4(),
Task6() % Just build a new job, executing them in the order desired
end,
push_job(Task).
It is rather easy to do stuff like this if you have functions as first-class citizens so you can build them in your language at whim, like you can in, say, Any functional programming language, Python, Ruby-blocks - and so on.
I don't particularly like the idea of building a queue, or a continuation stack, like in "Option 1" though and I would definitely go with the second option. In Erlang, we even have a programs called jobs
written by Erlang Solutions and released as Open Source. jobs
is built to execute and load regulate job executions like these. I'd probably combine option 2 with jobs if I were to solve this problem.
The answers suggesting not use a thread-pool is like hard-coding the knowledge of task dependencies/execution order. Instead, I would create a CompositeTask
that manges the start/end dependency between two tasks. By encapsulating the dependency behind the task interface, all tasks can be treated uniformly, and added to the pool. This hides the execution details and allows the task dependencies to change without affecting whether or not you use a thread pool.
The question doesn't specify a language - I'll use Java, which I hope is readable for most.
class CompositeTask implements Task
{
Task firstTask;
Task secondTask;
public void run() {
firstTask.run();
secondTask.run();
}
}
This executes tasks sequentially and on the same thread. You can chain many CompositeTask
s together to create a sequence of as many sequential tasks as needed.
The downside here is that this ties up the thread for the duration of all tasks executing sequentially. You may have other tasks that you would prefer to execute inbetween the first and second tasks. So, rather than execute the second task directly, have the composite task schedule execution of the second task:
class CompositeTask implements Runnable
{
Task firstTask;
Task secondTask;
ExecutorService executor;
public void run() {
firstTask.run();
executor.submit(secondTask);
}
}
This ensures that the second task doesn't run until after the first task is complete and also allows the pool to execute other (possibly more urgent) tasks. Note that the first and second tasks may execute on separate threads, so although they do not execute concurrently, any shared data used by the tasks must be made visible to other threads (e.g. by making the variables volatile
.)
This is a simple, yet powerful and flexible approach, and allows the tasks themselves to define execution constraints, rather than doing it by using different thread pools.
Use two Active Objects. In two words: active object pattern consists from priority queue and 1 or many working threads those can get tasks from queue and process its.
So use one active object with one working thread: all tasks those would be places to queue would be processed sequentially. Use second active object with number of working thread more then 1. In this case working threads would get and process tasks from queue in any order.
Luck.
I think you're mixing concepts. Threadpool is ok when you want to distribute some work among threads but if you start mixing dependencies between threads then it isn't such a good idea.
My advice, simply don't use the threadpool for those tasks. Just create a dedicated thread and keep a simple queue of sequential items that must be processed by that thread alone. Then you can keep pushing tasks to the thread pool when you don't have a sequential requirement and use the dedicated thread when you have.
A clarification: Using common sense, a queue of serial tasks shall be executed by a single thread processing each task one after another :)
This is achievable, well, as far as I understand your scenario. Basically what you need is do something smart to coordinate your tasks in main thread. Java API your need are ExecutorCompletionService and Callable
First, implement your callable task:
public interface MyAsyncTask extends Callable<MyAsyncTask> {
// tells if I am a normal or dependent task
private boolean isDependent;
public MyAsyncTask call() {
// do your job here.
return this;
}
}
Then in your main thread, use CompletionService coordinate the dependent task execution (i.e. a wait mechanism):
ExecutorCompletionService<MyAsyncTask> completionExecutor = new
ExecutorCompletionService<MyAsyncTask>(Executors.newFixedThreadPool(5));
Future<MyAsyncTask> dependentFutureTask = null;
for (MyAsyncTask task : tasks) {
if (task.isNormal()) {
// if it is a normal task, submit it immediately.
completionExecutor.submit(task);
} else {
if (dependentFutureTask == null) {
// submit the first dependent task, get a reference
// of this dependent task for later use.
dependentFutureTask = completionExecutor.submit(task);
} else {
// wait for last one completed, before submit a new one.
dependentFutureTask.get();
dependentFutureTask = completionExecutor.submit(task);
}
}
}
By doing this, you use a single executor (threadpool size 5) execute both normal and dependent tasks, the normal task are executed immediately as soon as submitted, the dependent tasks are executed one by one (wait are performed in main thread by calling get() on Future before submitting new dependent task), so at any point of time, you always have a number of normal tasks and a single dependent task (if exists) running in a single threadpool.
This is just a head start, by using ExecutorCompletionService, FutureTask and Semaphore, you can implement more complex thread coordination scenario.
How would you ensure those tasks are ordered?
push task1
push task2
push task346
push task5
In response to the edit:
push task1
push task27 **
push task3468 *
push task5
push task9
You have two different kind of tasks. Mixing them up in a single queue feels rather odd. Instead of having one queue have two. For the sake of simplicity you could even use a ThreadPoolExecutor for both. For the serial tasks just give it a fixed size of 1, for the tasks that can be executed concurrently give it more. I don't see why that would be clumsy at all. Keep it simple and stupid. You have two different tasks so treat them accordingly.
Since you only need to wait for a single task to complete before starting the dependent task, it can be easily done if you can schedule the dependent task in the first task. So in your second example: at the end of task 2, schedule task 7 and at the end of task 3, schedule task 4 and so on for 4->6 and 6->8.
In the beginning, just schedule tasks 1,2,5,9... and the rest should follow.
An even more general problem is when you have to wait for multiple tasks before a dependent task can start. Handling that efficiently is a non-trivial exercise.
Executing tasks serially on the ThreadPool
is quite easy, by using the ExclusiveScheduler
property of a ConcurrentExclusiveSchedulerPair
instance, and using it as a TaskScheduler
every time we start a task:
var taskFactory = new TaskFactory(
new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);
Task task1 = taskFactory.StartNew(() => DoSomething());
Task task2 = taskFactory.StartNew(() => DoSomethingElse());
The DoSomething()
and DoSomethingElse
will both run on the ThreadPool
, the one after the other. It is guaranteed that the two invocations will not overlap, and also that they will be invoked in the same order that they were scheduled initially.
But what will happen if any of these invocations fail? Here is the problem: any exception thrown by the DoSomething()
or the DoSomethingElse
will be trapped inside the respective Task
(the task1
or the task2
). Which means that we can't just start the tasks and forget about them. We have the responsibility to store the tasks somewhere, and eventually await
them and handle their exceptions. Which may be exactly what we want.
But what if we just want to schedule the tasks and "forget" about them, and in the unlikely scenario that any of them fails to have the exception propagate as an unhandled exception and terminate the process? This is not as crazy as it sounds. Some tasks may be so critical for the life of the application, and so unlikely that they'll ever fail, and so hard to devise a strategy for observing their exceptions manually, that having their exception escalate to an instant application termination (after raising the AppDomain.UnhandledException
event) may be the lesser evil of the available options. So is it possible to do this? Yes, but it is surprising difficult and tricky:
using System.Runtime.ExceptionServices;
var taskFactory = new TaskFactory(
new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);
void RunOnThreadPoolExclusive(Action action)
{
_ = taskFactory.StartNew(() =>
{
try
{
action();
}
catch (Exception ex)
{
var edi = ExceptionDispatchInfo.Capture(ex);
ThreadPool.QueueUserWorkItem(_ => edi.Throw());
}
});
}
RunOnThreadPoolExclusive(() => DoSomething());
RunOnThreadPoolExclusive(() => DoSomethingElse());
The action
is invoked in a try/catch block. In case of failure, the exception is captured in a ExceptionDispatchInfo
instance, to preserve its stack trace, and then it's rethrown on the ThreadPool
. Notice that the taskFactory.StartNew
still returns a Task
, which is discarded by using a discard (_
), because now it's highly unlikely that the task can fail. But did we really make any progress? We started with the premise that the DoSomething
is very unlikely to fail, and we ended up discarding a Task
that we assess as highly unlikely to fail. Not very satisfying indeed! Can we do better? Yes! Enter the infamous world of async void
:
var taskFactory = new TaskFactory(
new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);
async void RunOnThreadPoolExclusive(Action action)
{
await taskFactory.StartNew(action);
}
RunOnThreadPoolExclusive(() => DoSomething());
RunOnThreadPoolExclusive(() => DoSomethingElse());
The async void
methods have the interesting characteristic that any exception thrown inside them is raised on the SynchronizationContext
that was captured when the async void
method started, or (as a fallback) on the ThreadPool
. So if for example the RunOnThreadPoolExclusive
is invoked on the UI thread of a WinForms application, and the action fails, a message box will popup, asking the user if they want to continue or quit the application (screenshot). So the error is not necessarily fatal, since the user may opt to ignore the error and continue. Which might be exactly what we want. Or might not.
To clarify, the error will be thrown on the UI thread, but the DoSomething()
/DoSomethingElse()
will still be invoked on the ThreadPool
. This has not changed.
So how exactly can we ensure that the error will be thrown on the ThreadPool
, and nowhere else, no matter what, irrespective of the current context, and without allowing any task to become fire-and-forget? Here is how:
var taskFactory = new TaskFactory(
new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);
void RunOnThreadPoolExclusive(Action action)
{
Task task = taskFactory.StartNew(action);
ThreadPool.QueueUserWorkItem(async state => await (Task)state, task);
}
RunOnThreadPoolExclusive(() => DoSomething());
RunOnThreadPoolExclusive(() => DoSomethingElse());
Serialized execution on the ThreadPool
, in the correct order, with the errors thrown on the ThreadPool
, and with no leaked fire-and-forget tasks. Perfect!
There is a java framework specifically for this purpose called dexecutor (disclaimer: I am the owner)
DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor();
executor.addDependency("task1", "task2");
executor.addDependency("task4", "task6");
executor.addDependency("task6", "task8");
executor.addIndependent("task3");
executor.addIndependent("task5");
executor.addIndependent("task7");
executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);
task1, task3, task5,task7 runs in parallel (Depending upon thread pool size), once task1 finishes, task2 runs, once task2 finishes task4 runs, once task4 finishes task6 runs and finally once task6 finishes task8 runs.
There have been a lot of answers, and obviously one has been accepted. But why not use continuations?
If you have a known "serial" condition, then when you enqueue the first task with this condition, hold the Task; and for further tasks invoke Task.ContinueWith().
public class PoolsTasks
{
private readonly object syncLock = new object();
private Task serialTask = Task.CompletedTask;
private bool isSerialTask(Action task) {
// However you determine what is serial ...
return true;
}
public void RunMyTask(Action myTask) {
if (isSerialTask(myTask)) {
lock (syncLock)
serialTask = serialTask.ContinueWith(_ => myTask());
} else
Task.Run(myTask);
}
}
Thread Pool with ordered and unordered execute methods:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class OrderedExecutor {
private ExecutorService multiThreadExecutor;
// for single Thread Executor
private ThreadLocal<ExecutorService> threadLocal = new ThreadLocal<>();
public OrderedExecutor(int nThreads) {
this.multiThreadExecutor = Executors.newFixedThreadPool(nThreads);
}
public void executeUnordered(Runnable task) {
multiThreadExecutor.submit(task);
}
public void executeOrdered(Runnable task) {
multiThreadExecutor.submit(() -> {
ExecutorService singleThreadExecutor = threadLocal.get();
if (singleThreadExecutor == null) {
singleThreadExecutor = Executors.newSingleThreadExecutor();
threadLocal.set(singleThreadExecutor);
}
singleThreadExecutor.submit(task);
});
}
public void clearThreadLocal() {
threadLocal.remove();
}
}
After filling all queues the threadLocal should be cleared. The only drawback is that singleThreadExecutor will be created each time the method
executeOrdered(Runnable task)
invoked in separate thread
精彩评论