Using ExecutorService with a tree of tasks to perform
We had a bit of a problem. :)
We want to ensure that only N threads are doing background tasks at any time. To do this, we used a fixed thread pool executor. It seemed to be working fine.
Then we found an issue. Suppose you have a class which uses the executor to do some parallel work and then it calls some other class while in the executor thread which also does some parallel work, intending to wait on it. Here's what happens:
- Main thread calls the first level method.
- This method thinks it can parallelise into 16 tasks and splits up its work.
- 16 tasks are submitted to the executor.
- Main thread starts waiting for its tasks to complete.
- Supposing there are four threads available, the first four tasks each get picked up and run. So there are 12 tasks left on the queue.
- Now, one of these tasks calls so开发者_如何学Gome other method.
- This new method thinks it can parallelise into 2 tasks. Let's say it's the first step in a parallel merge sort or something along those lines.
- 2 tasks are submitted to the executor.
- This thread now starts waiting for its tasks to complete.
Uh-oh. So at this point, all four threads will now be waiting for tasks to complete but they are collaboratively blocking the executor actually running those tasks.
Solution 1 to this problem was as follows: on submitting a new task to the executor, if we are already running all our threads, and we are already running on one of the executor threads, run the task inline. This worked fine for 10 months, but now we have hit a problem with it. If the new tasks it is submitting are still relatively large, then you can get into a situation where the new task blocks the method from adding the other tasks to the queue, which would otherwise be able to be picked up by the other worker threads. So you get periods of huge delays while a thread is processing the work inline.
Is there a better solution to the core problem of executing a potentially unbounded tree of background tasks? I understand that .NET's equivalent to the executor service has some kind of in-built ability to steal from the queue which prevents the original deadlock issue from occurring, which as far as I can tell is an ideal solution. But what about over in Java land?
Java 7 has the concept of a ForkJoinPool
that allows a task to "fork" off another task by submitting it tot he same Executor. Then gives it the option of later attempting to "help join" that task by attempting to run it if it has not been run.
I believe the same thing can be done in Java 6 by simple combining an Executor
with FutureTask
. Like so:
public class Fib implements Callable<Integer> {
int n;
Executor exec;
Fib(final int n, final Executor exec) {
this.n = n;
this.exec = exec;
}
/**
* {@inheritDoc}
*/
@Override
public Integer call() throws Exception {
if (n == 0 || n == 1) {
return n;
}
//Divide the problem
final Fib n1 = new Fib(n - 1, exec);
final Fib n2 = new Fib(n - 2, exec);
//FutureTask only allows run to complete once
final FutureTask<Integer> n2Task = new FutureTask<Integer>(n2);
//Ask the Executor for help
exec.execute(n2Task);
//Do half the work ourselves
final int partialResult = n1.call();
//Do the other half of the work if the Executor hasn't
n2Task.run();
//Return the combined result
return partialResult + n2Task.get();
}
}
You could make use of callbacks instead of having your thread wait for the tasks to complete. Your tasks themselves will need to be callbacks since they submit more tasks.
E.g.:
public class ParallelTask implements Runnable, Callback {
private final Callback mCB;
private final int mNumChildTasks;
private int mTimesCalledBack = 0;
private final Object mLock = new Object();
private boolean mCompleted = false;
public ParallelTask(Callback cb) {
mCB = cb;
mNumChildTasks = N; // the number of direct child tasks you know this task will spawn
// only going down 1 generation
// of course you could figure this number out in the run method (will need to be volatile if so)
// just as long as it is set before submitting any child tasks for execution
}
@Override
public void run() {
// do your stuff
// and submit your child tasks, but don't wait on them to complete
synchronized(mLock) {
mCompleted = true;
if (mNumChildTasks == mTimesCalledBack) {
mCB.taskCompleted();
}
}
}
// Callback interface
// taskCompleted is being called from the threads that this task's children are running in
@Override
public void taskCompleted() {
synchronized(mLock) {
mTimesCalledBack++;
// only call our parent back if our direct children have all called us back
// and our own task is done
if (mCompleted && mTimesCalledBack == mNumChildTasks) {
mCB.taskCompleted();
}
}
}
}
In your main thread you submit your root task and register some callback to be executed.
Since all child tasks don't report completion until their children have reported completion, your root callback shouldn't be called until everything is done.
I wrote this on the fly and haven't tested or compiled it, so there may be some errors.
It seems like the issue is that the tasks also try to parallelize themselves which makes it difficult to avoid resource constraints. Why do you need to do this? Why not always run the subtasks inline?
If you're fully utilizing the cpu already by parallelization then you're not going to buy much in terms of overall work accomplished by dividing the work up again into smaller tasks.
精彩评论