Java using ExecutorService,CompletionService,BlockingQueue,and Observer correctly?
So, I'm pretty new to multi-threading and have been using this idea in all my programs lately. Before I start using it more I really want to make sure it is a correct efficient way to implement multi-threading using the Executor,CompletionService and a BlockingQueue plus an Observer. I'll provide example code below but let me first quickly explain how I think it works and maybe that will help.
The first thing I have is a BlockingQueue all tasks are added to this queue via an add(Task task) method. Upon creation of the class the run method is called with a while(true) calling take on the queue blocking until something gets added to the task queue.
Once something gets added to the queue inside the run() queue.take() returns the item on queue. Then I take that item and pass it to WorkerThread class that does stuff on it. That workerThread is added to the CompletionService pool which handles the waiting for a thread to finish.
Ok now comes the part i'm not sure is correct. I also have an inner class that implements runnable and is started when the class is initialized. Its job is to loop forever calling pool.take(). So, this essentially waits for one of the WorkerThreads to complete. I let the completion service handle this. Once the take() gets a value the inner class passes it to a notify observer method.
Is this okay implementation.? It concerns me a bit that there is the main classes run with a while(true) looping on task queue and an inner class also looping waiting on pool to receive a result from WorkerThread?
Here is an example implementation. What you think?
public class HttpSchedulerThreaded extends Observable implements Runnable {
private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
numThreadsInPool = numThreads;
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public HttpSchedulerThreaded()
{
numThreadsInPool = 1;
executor = Executors.newFixedThreadPool(1);
doneSignal = new CountDownLatch(1);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public void setThreadCount(int numThreads)
{
if(!isRunning){
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
numThreadsInPool = numThreads;
}
}
public void start()
{
if(!isRunning){
responseWorkerThread.start();
new Thread(this).start();
isRunning = true;
}
}
public void add(VulnInfo info) {
queue.add(info);
}
@Override
public void run() {
// TODO Auto-generated method stub
while(shouldRun)
{
try {
VulnInfo info = queue.take();
Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
//System.out.println("submitting to pooler: " + info.getID());
pool.submit(worker);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
* are complete it will send them to server for completion.
* @author Steve
*
*/
class HttpSchedulerWorker implements Runnable{
public void run() {
// TODO Auto-generated method stub
while(true)
{
VulnInfo vulnInfo = null;
开发者_StackOverflow中文版 try {
//System.out.println("taking finished request");
Future<VulnInfo> tmp = pool.take();
// Future<VulnInfo> tmp = pool.poll();
if(tmp != null)
vulnInfo = tmp.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(vulnInfo != null)
{
//System.out.println("updating all observers: " + vulnInfo.getID());
updateObservers(vulnInfo);
}
}
}
}
From my experience, your solution seems to be okay. I have three comments/suggestions:
- Once you create a new thread of execution
responseWorkerThread = new Thread(schedulerWorker)
andresponseWorkerThread.start()
, you've essentially broken apart those two loops. This part looks okay. You do seem to be using theExecutor
s API correctly, but it does look like you may need some more code for stopping theHttpScheduledWorker
thread and for shutting down theExecutionCompletionService
as part of theHttpSchedulerThreaded
class. - I'm not sure that your use of
queue
is really necessary.ExecutionCompletionService
already uses aBlockingQueue
to manage the tasks which are submitted to it. - Your "question" may be a better fit over at the beta Code Review site.
精彩评论