Java ThreadPoolExecutor getting stuck while using ArrayBlockingQueue
I'm working on some application and using ThreadPoolExecutor for handling various tasks. ThreadPoolExecutor is getting stuck after som开发者_StackOverflowe duration. To simulate this in a simpler environment, I've written a simple code where I'm able to simulate the issue.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyThreadPoolExecutor {
private int poolSize = 10;
private int maxPoolSize = 50;
private long keepAliveTime = 10;
private ThreadPoolExecutor threadPool = null;
private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
100000);
public MyThreadPoolExecutor() {
threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,
keepAliveTime, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor threadPoolExecutor) {
System.out
.println("Execution rejected. Please try restarting the application.");
}
});
}
public void runTask(Runnable task) {
threadPool.execute(task);
}
public void shutDown() {
threadPool.shutdownNow();
}
public ThreadPoolExecutor getThreadPool() {
return threadPool;
}
public void setThreadPool(ThreadPoolExecutor threadPool) {
this.threadPool = threadPool;
}
public static void main(String[] args) {
MyThreadPoolExecutor mtpe = new MyThreadPoolExecutor();
for (int i = 0; i < 1000; i++) {
final int j = i;
mtpe.runTask(new Runnable() {
@Override
public void run() {
System.out.println(j);
}
});
}
}
}
Try executing this code a few times. It normally print outs the number on console and when all threads end, it exists. But at times, it finished all task and then is not getting terminated. The thread dump is as follows:
MyThreadPoolExecutor [Java Application]
MyThreadPoolExecutor at localhost:2619 (Suspended)
Daemon System Thread [Attach Listener] (Suspended)
Daemon System Thread [Signal Dispatcher] (Suspended)
Daemon System Thread [Finalizer] (Suspended)
Object.wait(long) line: not available [native method]
ReferenceQueue<T>.remove(long) line: not available
ReferenceQueue<T>.remove() line: not available
Finalizer$FinalizerThread.run() line: not available
Daemon System Thread [Reference Handler] (Suspended)
Object.wait(long) line: not available [native method]
Reference$Lock(Object).wait() line: 485
Reference$ReferenceHandler.run() line: not available
Thread [pool-1-thread-1] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-2] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-3] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-4] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-6] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-8] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-5] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-10] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-9] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-7] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [DestroyJavaVM] (Suspended)
C:\Program Files\Java\jre1.6.0_07\bin\javaw.exe (Jun 17, 2010 10:42:33 AM)
In my actual application,ThreadPoolExecutor threads go in this state and then it stops responding.
Regards, Ravi Rao
In your main
method, you never call mtpe.shutdown()
. A ThreadPoolExecutor will attempt to keep its corePoolSize
alive indefinitely. Sometimes, you get lucky and you have more than corePoolSize
threads alive, so every worker thread will go into a conditional logic branch that allows it to terminate after your specified timeout period of 10 seconds. However, as you have noticed, sometimes this is not the case so every thread in the executor will block on ArrayBlockingQueue.take() and wait for a new task.
Also, please note, there is a significant difference between ExecutorService.shutdown() and ExecutorService.shutdownNow(). If you call ExecutorService.shutdownNow() as your wrapper implementation indicates, you will on occasion drop some tasks which have not been assigned for execution.
Update: Since my original answer, the ThreadPoolExecutor
implementation has changed such that the program in the original post should never exit.
精彩评论