开发者

Proper implementation of producer-consumer scenario and "graceful" termination of thread pool

I am working on my first multi-threaded project and thus have a couple of things that I am unsure of. Details on my setup was on a previous question, in short: I have a thread pool implemented by Executors.newFixedThreadPool(N). One thread is given an action which does a series of queries to local and remote resources and iteratively populates an ArrayBlockingQueue, while the rest of the threads invoke take() method on the queue and process the objects in the queue.

Even though small and supervised tests seem to run OK, I am unsure about how I handle special scenarios such as the beginning (the queue has no items yet), the end (the queue is emptied), and any eventual InterruptedExceptions. I have done some reading here on SO, which then led me to two really nice articles by Goetz and Kabutz. The consensus seems to be that one should not ignore these exceptions. However I am unsure how the examples supplied relates to my situation, I have not invoked thread.interrupt() anywhere in my code... Speaking of which, I'm getting unsure if I should have done so...

To sum it up, given the code below, how do I best handle the special cases, such as termination criteria and the InterrruptedExceptions? Hope the questions make sense, otherwise I'll do my best to describe it further.

Thanks in advance,


edit: I have been working on the implementation for a while now, and I have come across a new hiccup so I figured I'd update the situation. I have had the misfortune of coming across ConcurrentModificationException which was most likely due to incomplete shutdown/termination of the thread pool. As soon as I figured out I could use isTerminated() I tried that, then I got a IllegalMonitorStateException due to an unsynchronized wait(). The current state of the code is below:

I have followed some of the advices from @Jonathan's answer, however I don't think his proposal works quite like what I need/want. The background story is the same as I have mentioned above, and relevant bits of code are as follows:

Class holding/managing the pool, and submission of runnables:

public void serve() {
    try {
        this.started = true;
        pool.execute(new QueryingAction(pcqs));
        for(;;){
            PathwayImpl p = bq.take();

            if (p.getId().equals("0")){
                System.out.println("--DEBUG: Termination criteria found, shutdown initiated..");
                pool.shutdown();
                            // give 3 minutes per item in queue to finish up
                pool.awaitTermination(3 * bq.size(), TimeUnit.MINUTES);
                break;
            }
            int sortMethod = AnalysisParameters.getInstance().getSort_method();
            pool.submit(new AnalysisAction(p)); 
        }
      } catch (Exception ex) {
          ex.printStackTrace();
          System.err.println("Unexpected error in core analysis, terminating execution!");
          System.exit(0);
      }finally{   pool.shutdown();     }
}

public boolean isDone(){
    if(this.started)
        return pool.isTerminated();
    else
        return false;
    }

Elements are added to the queue by the following code on located in a separate class:

this.queue.offer(path, offer_wait, TimeUnit.MINUTES);

... motivation behind offer() instead of take() is as Jonathan mentioned. Unforeseen blocks are annoying and hard to figure out as my analysis take a long time as it is. So I need to know relatively quick if the fails due to a bad block, or if it's just crunching numbers...


and finally; here's the code in my test class where I check the interaction between the "concurrency service" (named cs here) and the rest of the objects to be analyzed:

cs.serve();
synchronized (this) {
    while(!cs.isDone())
    this.wait(5000);
}
ReportGenerator rg = new ReportGenerator();
rg开发者_开发问答.doReports();

I realize that this has been a VERY long question but I tried to be detailed and specific. Hopefully it won't be too much of a drag, and I apologize in case it is...


Instead of using take, which blocks, use something more like this:

PathwayImpl p = null;
synchronized (bq) {
    try {
        while (bq.isEmpty() && !stopSignal) {
            bq.wait(3000); // Wait up to 3 seconds and check again
        }

        if (!stopSignal) {
            p = bq.poll();
        }
    }
    catch (InterruptedException ie) {
        // Broke us out of waiting, loop around to test the stopSignal again
    }
}

This assumes that the block is enclosed in some sort of while (!stopSignal) {...}.

Then, in the code that adds to the queue, do this:

synchronized (bq) {
    bq.add(item);
    bq.notify();
}

As for InterruptedExceptions, they are good for signaling the thread to test the stop signal immediately, instead of waiting until the next timeout-and-test. I suggest just testing your stop signal again, and possibly logging the exception.

I use them when signaling a panic, versus a normal shutdown, but it is rare that such a situation is necessary.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜