开发者

Dynamic resizing of java.util.concurrent.ThreadPoolExecutor while it has waiting tasks

I'm working with a java.util.concurrent.ThreadPoolExecutor to process a number of items in parallel. Although the threading itself works fine, at times we've run into other resource constraints due to actions happening in the threads, which made us want to dial down the number of Threads in the pool.

I'd like to know if there's a way to dial down the number of the threads开发者_运维百科 while the threads are actually working. I know that you can call setMaximumPoolSize() and/or setCorePoolSize(), but these only resize the pool once threads become idle, but they don't become idle until there are no tasks waiting in the queue.


You absolutely can. Calling setCorePoolSize(int) will change the core size of the pool. Calls to this method are thread-safe and override settings provided to the constructor of ThreadPoolExecutor. If you are trimming the pool size, the remaining threads will shut-down once their current job queue is completed (if they are idle, they will shut-down immediately). If you are increasing the pool size, new threads will be allocated as soon as possible. The timeframe for the allocation of new threads is undocumented — but in the implementation, allocation of new threads is performed upon each call to the execute method.

To pair this with a runtime-tunable job-farm, you can expose this property (either by wrapper or using a dynamic MBean exporter) as a read-write JMX attribute to create a rather nice, on-the-fly tunable batch processor.

To reduce the pool size forcibly in runtime (which is your request), you must subclass the ThreadPoolExecutor and add a disruption to the beforeExecute(Thread,Runnable) method. Interrupting the thread is not a sufficient disruption, since that only interacts with wait-states and during processing the ThreadPoolExecutor task threads do not go into an interruptable state.

I recently had the same problem trying to get a thread pool to forcibly terminate before all submitted tasks are executed. To make this happen, I interrupted the thread by throwing a runtime exception only after replacing the UncaughtExceptionHandler of the thread with one that expects my specific exception and discards it.

/**
 * A runtime exception used to prematurely terminate threads in this pool.
 */
static class ShutdownException
extends RuntimeException {
    ShutdownException (String message) {
        super(message);
    }
}

/**
 * This uncaught exception handler is used only as threads are entered into
 * their shutdown state.
 */
static class ShutdownHandler 
implements UncaughtExceptionHandler {
    private UncaughtExceptionHandler handler;

    /**
     * Create a new shutdown handler.
     *
     * @param handler The original handler to deligate non-shutdown
     * exceptions to.
     */
    ShutdownHandler (UncaughtExceptionHandler handler) {
        this.handler = handler;
    }
    /**
     * Quietly ignore {@link ShutdownException}.
     * <p>
     * Do nothing if this is a ShutdownException, this is just to prevent
     * logging an uncaught exception which is expected.  Otherwise forward
     * it to the thread group handler (which may hand it off to the default
     * uncaught exception handler).
     * </p>
     */
    public void uncaughtException (Thread thread, Throwable throwable) {
        if (!(throwable instanceof ShutdownException)) {
            /* Use the original exception handler if one is available,
             * otherwise use the group exception handler.
             */
            if (handler != null) {
                handler.uncaughtException(thread, throwable);
            }
        }
    }
}
/**
 * Configure the given job as a spring bean.
 *
 * <p>Given a runnable task, configure it as a prototype spring bean,
 * injecting any necessary dependencices.</p>
 *
 * @param thread The thread the task will be executed in.
 * @param job The job to configure.
 *
 * @throws IllegalStateException if any error occurs.
 */
protected void beforeExecute (final Thread thread, final Runnable job) {
    /* If we're in shutdown, it's because spring is in singleton shutdown
     * mode.  This means we must not attempt to configure the bean, but
     * rather we must exit immediately (prematurely, even).
     */
    if (!this.isShutdown()) {
        if (factory == null) {
            throw new IllegalStateException(
                "This class must be instantiated by spring"
                );
        }

        factory.configureBean(job, job.getClass().getName());
    }
    else {
        /* If we are in shutdown mode, replace the job on the queue so the
         * next process will see it and it won't get dropped.  Further,
         * interrupt this thread so it will no longer process jobs.  This
         * deviates from the existing behavior of shutdown().
         */
        workQueue.add(job);

        thread.setUncaughtExceptionHandler(
            new ShutdownHandler(thread.getUncaughtExceptionHandler())
            );

        /* Throwing a runtime exception is the only way to prematurely
         * cause a worker thread from the TheadPoolExecutor to exit.
         */
        throw new ShutdownException("Terminating thread");
    }
}

In your case, you may want to create a semaphore (just for use as a threadsafe counter) which has no permits, and when shutting down threads release to it a number of permits that corresponds to the delta of the previous core pool size and the new pool size (requiring you override the setCorePoolSize(int) method). This will allow you to terminate your threads after their current task completes.

private Semaphore terminations = new Semaphore(0);

protected void beforeExecute (final Thread thread, final Runnable job) {
    if (terminations.tryAcquire()) {
        /* Replace this item in the queue so it may be executed by another
         * thread
         */
        queue.add(job);

        thread.setUncaughtExceptionHandler(
            new ShutdownHandler(thread.getUncaughtExceptionHandler())
            );

        /* Throwing a runtime exception is the only way to prematurely
         * cause a worker thread from the TheadPoolExecutor to exit.
         */
        throw new ShutdownException("Terminating thread");
    }
}

public void setCorePoolSize (final int size) {
    int delta = getActiveCount() - size;

    super.setCorePoolSize(size);

    if (delta > 0) {
        terminations.release(delta);
    }
}

This should interrupt n threads for f(n) = active - requested. If there is any problem, the ThreadPoolExecutors allocation strategy is fairly durable. It book-keeps on premature termination using a finally block which guarantees execution. For this reason, even if you terminate too many threads, they will repopulate.


As far as I can tell, this is not possible in a nice clean way.

You can implement the beforeExecute method to check some boolean value and force threads to halt temporarily. Keep in mind, they will contain a task which will not be executed until they are re-enabled.

Alternatively, you can implement afterExecute to throw a RuntimeException when you are saturated. This will effectively cause the Thread to die and since the Executor will be above the max, no new one would be created.

I don't recommend you do either. Instead, try to find some other way of controlling concurrent execution of the tasks which are causing you a problem. Possibly by executing them in a separate thread pool with a more limited number of workers.


The solution is to drain the ThreadPoolExecutor queue, set the ThreadPoolExecutor size as needed and then add back the threads, one by one, as soon as the others ends. The method to drain the queue in the ThreadPoolExecutor class is private so you have to create it by yourself. Here is the code:

/**
 * Drains the task queue into a new list. Used by shutdownNow.
 * Call only while holding main lock.
 */
public static List<Runnable> drainQueue() {
    List<Runnable> taskList = new ArrayList<Runnable>();
    BlockingQueue<Runnable> workQueue = executor.getQueue();
    workQueue.drainTo(taskList);
    /*
     * If the queue is a DelayQueue or any other kind of queue
     * for which poll or drainTo may fail to remove some elements,
     * we need to manually traverse and remove remaining tasks.
     * To guarantee atomicity wrt other threads using this queue,
     * we need to create a new iterator for each element removed.
     */
    while (!workQueue.isEmpty()) {
        Iterator<Runnable> it = workQueue.iterator();
        try {
            if (it.hasNext()) {
                Runnable r = it.next();
                if (workQueue.remove(r))
                    taskList.add(r);
            }
        } catch (ConcurrentModificationException ignore) {
        }
    }
    return taskList;
}

Before calling this method you need to get and then release the main lock. To do this you need to use java reflection because the field "mainLock" is private. Again, here is the code:

private Field getMainLock() throws NoSuchFieldException {
    Field mainLock = executor.getClass().getDeclaredField("mainLock");
    mainLock.setAccessible(true);
    return mainLock;
}

Where "executor" is your ThreadPoolExecutor.

Now you need lock/unlock methods:

public void lock() {
    try {
        Field mainLock = getMainLock();
        Method lock = mainLock.getType().getDeclaredMethod("lock", (Class[])null);
        lock.invoke(mainLock.get(executor), (Object[])null);
    } catch {
        ...
    } 
}

public void unlock() {
    try {
        Field mainLock = getMainLock();
        mainLock.setAccessible(true);
        Method lock = mainLock.getType().getDeclaredMethod("unlock", (Class[])null);
        lock.invoke(mainLock.get(executor), (Object[])null);
    } catch {
        ...
    }  
}

Finally you can write your "setThreadsNumber" method, and it will work both increasing and decreasing the ThreadPoolExecutor size:

public void setThreadsNumber(int intValue) {
    boolean increasing = intValue > executor.getPoolSize();
    executor.setCorePoolSize(intValue);
    executor.setMaximumPoolSize(intValue);
    if(increasing){
        if(drainedQueue != null && (drainedQueue.size() > 0)){
            executor.submit(drainedQueue.remove(0));
        }
    } else {
        if(drainedQueue == null){
            lock();
            drainedQueue = drainQueue();
            unlock();
        }
    }
}

Note: obviously if you execute N parallel threads and the you change this number to N-1, all the N threads will continue to run. When the first thread ends no new threads will be executed. From now on the number of parallel thread will be the one you have chosen.


I was in a need for the same solution too, and it seems that in JDK8 the setCorePoolSize() and setMaximumPoolSize() do indeed produce the desired result. I made a test case where I submit 4 tasks to the pool and they execute concurently, I shrink the pool size while they are running and submit yet another runnable that I want to be lonesome. Then I restore the pool back to its original size. Here is the test source https://gist.github.com/southerton81/96e141b8feede3fe0b8f88f679bef381

It produces the following output (thread "50" is the one that should be executed in isolation)

run:
test thread 2 enter
test thread 1 enter
test thread 3 enter
test thread 4 enter
test thread 1 exit
test thread 2 exit
test thread 3 exit
test thread 4 exit
test thread 50 enter
test thread 50 exit
test thread 1 enter
test thread 2 enter
test thread 3 enter
test thread 4 enter
test thread 1 exit
test thread 2 exit
test thread 3 exit
test thread 4 exit
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜