开发者

Java Thread Pool with a Bounded Queue

I'm using java.util.concurrent's Executors class to create a fixed thread pool for running request handlers for a web server:

static ExecutorService  newFixedThreadPool(int nThreads) 

and the description is:

Creates a thread pool that reuses a fixed set of threads operating off a shared unbounded queue.

However, I am looking for thread pool implementation which will do the exact same thing, except with a bounded queue. Is there such an implementation? Or do开发者_如何学JAVA I need to implement my own wrapper for the fixed thread pool?


What you want to do is new your own ExecutorService, probably using ThreadPoolExecutor. ThreadPoolExecutor has a constructor which takes a BlockingQueue and to get a bounded queue you use for example ArrayBlockingQueue properly constructed for bounding. You can also include a RejectedExecutionHandler in order to determine what to do when your queue is full, or hang on to a reference to the blocking queue and use the offer methods.

Here's a mini example:

BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(
    100);
ExecutorService executorService = new ThreadPoolExecutor(1, 10, 30,
    TimeUnit.SECONDS, linkedBlockingDeque,
    new ThreadPoolExecutor.CallerRunsPolicy());


Create a ThreadPoolexecutor and pass suitable BlockingQueue implementation in it. for e.g. you can pass in a ArrayBlockingQueue in the ThreadPoolExecutor constructor to get the desired effect.


I've solved this with a Semaphore which I use to throttle tasks being submitted to the ExecutorService.

Eg:

int threadCount = 10;
ExecutorService consumerPool = Executors.newFixedThreadPool(threadCount);

// set the permit count greater than thread count so that we 
// build up a limited buffer of waiting consumers
Semaphore semaphore = new Semaphore(threadCount * 100); 

for (int i = 0; i < 1000000; ++i) {
    semaphore.acquire(); // this might block waiting for a permit 
    Runnable consumer = () -> {
       try {
          doSomeWork(i);
       } finally {
          semaphore.release(); // release a permit 
       }
    };
    consumerPool.submit(consumer);
}


When you create a ThreadPoolExecutor you can give it a bounded BlockingQueue and a RejectedExecutionHandler so you can control what happens when the limit is reached. The default behaviour is to throw a RejectedExecutionException.

You can also define you own thread factory to control the thread names and make them daemon threads.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜