How can I make sure a threadpool is finished?
The setup:
I am in the process of changing the way a program works under the hood. In the current version works like this:
public void threadWork( List<MyCallable> workQueue )
{
ExecutorService pool = Executors.newFixedThreadPool(someConst);
List<Future<myOutput>> returnValues = new ArrayList<Future<myOutput>>();
List<myOutput> finishedStuff = new ArrayList<myOutput>();
for( int i = 0; i < workQueue.size(); i++ )
{
returnValues.add( pool.submit( workQueue.get(i) ) );
}
while( !returnValues.isEmpty() )
{
try
{
// Future.get() waits for a value from the calla开发者_运维百科ble
finishedStuff.add( returnValues.remove(0).get(0) );
}
catch(Throwable iknowthisisbaditisjustanexample){}
}
doLotsOfThings(finsihedStuff);
}
But the new system is going to use a private inner Runnable to call a synchronized method that writes the data into a global variable. My basic setup is:
public void threadReports( List<String> workQueue )
{
ExecutorService pool = Executors.newFixedThreadPool(someConst);
List<MyRunnable> runnables = new ArrayList<MyRunnable>()
for ( int i = 0; i < modules.size(); i++ )
{
runnables.add( new MyRunnable( workQueue.get(i) );
pool.submit(threads.get(i));
}
while( !runnables.isEmpty() )
{
try
{
runnables.remove(0).wait(); // I realized that this wouldn't work
}
catch(Throwable iknowthisisbaditisjustanexample){}
}
doLotsOfThings(finsihedStuff); // finishedStuff is the global the Runnables write to
}
If you read my comment in the try of the second piece of code you will notice that I don't know how to use wait(). I had thought it was basically like thread.join() but after reading the documentation I see it is not.
I'm okay with changing some structure as needed, but the basic system of taking work, using runnables, having the runnables write to a global variable, and using a threadpool are requirements.
The Question
How can I wait for the threadpool to be completely finished before I doLotsOfThings()?
You should call ExecutorService.shutdown() and then ExecutorService.awaitTermination.
...
pool.shutdown();
if (pool.awaitTermination(<long>,<TimeUnit>)) {
// finished before timeout
doLotsOfThings(finsihedStuff);
} else {
// Timeout occured.
}
Try this:
pool.shutdown();
pool.awaitTermination(WHATEVER_TIMEOUT, TimeUnit.SECONDS);
Have you considered using the Fork/Join framework that is now included in Java 7. If you do not want to use Java 7 yet you can get the jar for it here.
public void threadReports( List<String> workQueue )
{
ExecutorService pool = Executors.newFixedThreadPool(someConst);
Set<Future<?>> futures = new HashSet<Future<?>>();
for ( int i = 0; i < modules.size(); i++ )
{
futures.add(pool.submit(threads.get(i)));
}
while( !futures.isEmpty() )
{
Set<Future<?>> removed = new Set<Future<?>>();
for(Future<?> f : futures) {
f.get(100, TimeUnit.MILLISECONDS);
if(f.isDone()) removed.add(f);
}
for(Future<?> f : removed) futures.remove(f);
}
doLotsOfThings(finsihedStuff); // finishedStuff is the global the Runnables write to
}
shutdown
is a lifecycle method of the ExecutorService
and renders the executor unusable after the call. Creating and destroying ThreadPools in a method is as bad as creating/destroying threads: it pretty much defeats the purpose of using threadpool, which is to reduce the overhead of thread creation by enabling transparent reuse.
If possible, you should maintain your ExecutorService lifecycle in sync with your application. - create when first needed, shutdown when your app is closing down.
To achieve your goal of executing a bunch of tasks and waiting for them, the ExecutorService
provides the method invokeAll(Collection<? extends Callable<T>> tasks)
(and the version with timeout if you want to wait a specific period of time.)
Using this method and some of the points mentioned above, the code in question becomes:
public void threadReports( List<String> workQueue ) {
List<MyRunnable> runnables = new ArrayList<MyRunnable>(workQueue.size());
for (String work:workQueue) {
runnables.add(new MyRunnable(work));
}
// Executor is obtained from some applicationContext that takes care of lifecycle mgnt
// invokeAll(...) will block and return when all callables are executed
List<Future<MyRunnable>> results = applicationContext.getExecutor().invokeAll(runnables);
// I wouldn't use a global variable unless you have a VERY GOOD reason for that.
// b/c all the threads of the pool doing work will be contending for the lock on that variable.
// doLotsOfThings(finishedStuff);
// Note that the List of Futures holds the individual results of each execution.
// That said, the preferred way to harvest your results would be:
doLotsOfThings(results);
}
PS: Not sure why threadReports
is void
. It could/should return the calculation of doLotsOfThings
to achieve a more functional design.
精彩评论