Producer-Consumer with ExecutorService.newFixedThreadPool - How many threads are created?
public class MainClass {
private static final int producerPoolSize = 10;
private static final int consumerPoolSize = 20;
private ExecutorService prodExec = Executors.newFixedThreadPool(producerPoolSize);
private ExecutorService consExe开发者_如何学Goc = Executors.newFixedThreadPool(consumerPoolSize);
//main method here, which calls start() below
private void start(String[] args) {
// Get list of ids, split them in to n(producerPoolSize) chunks
for (int index = 0; index < producerPoolSize; index++) {
Runnable producer = new Producer(consExec, chunkOfIdsForThisProducer);
prodExec.execute(producer);
}
}
public class Producer implements Runnable {
private ExecutorService consExec;
private List<Long> list;
public Producer(ExecutorService exec, List<Long> list) {
this.consExec = exec;
this.list = list;
}
public void run() {
for (Long id: list) {
data = get data from db for the id
consExec.execute(new Consumer(data));
}
}
}
public class Consumer implements Runnable {
public void run() {
// call web service
}
}
In the above code, I have two thread pools - one each for Producers and Consumers. I get a number of IDs from the database,split them in to equal chunks so that they are handed out to Producer threads to process. A producer thread receives a list of IDs and processes each sequentially, retrieving data for each of of the IDs and submitting that data to a Consumer thread to process. Now my question is this:
I create 10 producer threads above. And I want the size of the Consumer thread pool to be 20. But, while processing each ID, the Producer creates a new Runnable (Consumer) and submits (execute) it to the Consumer executor service. My understanding of the ExecutorService is that the Runnable that you submit to it,gets wrapped in a Worker thread and then executed. So, in the above code, if the number of IDs each producer gets is 50, am I actually creating 50*10=500 Consumer threads? Is it too many?
Or does the pool size actually means the number of worker threads? So in the above code I am creating 500 tasks on the Consumer executor which would actually be queued and executed by 20 worker threads? I may not be explaining this correctly, but slightly confused here around the internal implementation of the executor and worried if I am creating too many Consumer threads.
If this isn't the way to implement this, can someone suggest a better approach? Thanks.
Does the pool size actually means the number of worker threads? Yes.
If the consumer Runnable process takes a long time only 20 will run concurrently. The rest will wait in a collection until a thread is available to run it.
As for if there is a better way to do this. Is there a reason you need to use threads? Unless you have 20 available processors running this in parallel may not increase your processing time because all of the threads will be spending time in context switches etc. that are not useful for processing the data.
Also, the producers are getting all of the data and storing it in the Consumers. If the consumers cannot run because you have 500 of them and only 20 can run at once then you are storing (500 minus 20) * the data you can process. You could have the consumers fetching their own data.
In response to comment:
instead of
for (int index = 0; index < producerPoolSize; index++) {
Runnable producer = new Producer(consExec, chunkOfIdsForThisProducer);
prodExec.execute(producer);
}
and Processor
for (Long id: list) {
data = get data from db for the id
consExec.execute(new Consumer(data));
}
Consumer looks like:
public class Consumer implements Runnable {
long myId;
Consumer(long id){
myId = id;
}
public void run() {
data = get data from db for the id
// do whatever a consumer does with data
}
}
and
private void start(String[] args) {
// Get list of ids create a new consumer for each id
for (int index = 0; index < everyID.length; index++) {
consExec.execute(new Consumer(everyID[i]));
}
}
Then you loose a whole class and the 20 pool makes more sense because Consumers that are blocked on IO fetching data will get waited and ones that are ready can continue processing.
The pool size is what determines the number of worker threads. If you try to submit an item while all the worker threads are busy, it will be queued by the ExecutorService and run once a worker becomes free.
The javadocs say this:
Creates a thread pool that reuses a fixed set of threads operating off a shared unbounded queue. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.
Note the hilighted parts. The number of threads is fixed, and the queue is unbounded, meaning items submitted when the threads are busy will always be queued, rather than rejected.
精彩评论