开发者

Stopping a threadpool on satisfied condition

I'm using the ExecutorService to process thousands of small independent tasks. Each task, on completion, stores the result (which is either true of false).

So, instead of processing all of the tasks, I want to shutdown the threadpool prematurely, if a task has found the answer! It feels lik开发者_JAVA百科e I'm missing something very obvious here...


Consider using the invokeAny method. It returns when just one is done.

http://download.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#invokeAny(java.util.Collection)


The desire you express reminds me of a Klein bottle, where there's little distinction maintained between what's "inside" and what's "outside." Here, the tasks submitted to the ExecutorService need to know that they must notify a latching gate outside the thread pool and shut it down when first transitioning from having seen no true task outcomes to having seen at least one.

I won't write the code for you, but I'll sketch the solution. It may help to define an interface on which the tasks must call when they complete:

interface TaskObserver
{
  void completed(boolean result);
}

Each task instance can be constructed with a reference to such a TaskObserver, on which the task body will call just before it completes and yields control back to the invoking ExecutorService. You could even write a base class to assist in participating in this protocol:

public abstract class ObservableTask implements Callable<Boolean>
{
  protected ObservableTask(TaskObserver observer)
  {
    if (null == observer)
      throw NullPointerException();
    observer_ = observer;
  }


  public final Boolean call()
  {
    final boolean result = evaluate();
    observer_.completed(result);
    return result;
  }


  protected abstract boolean evaluate();


  private final TaskObserver observer_;
}

Alternately, instead of using extension to define tasks, you could write a concrete class like this that accepts a reference to a Callable<Boolean> in its constructor in addition to the TaskObserver reference, and works through delegation instead.

Moving on, the implementation of TaskObserver will store an AtomicBoolean, which must be set to false initially. The body of the completed(boolean) method must attempt to set the AtomicBoolean from false to true if the result passed to completed(boolean) is true. If the transition from false to true is successful, shut down the ExecutorService and stop submitting any more tasks; any subsequent calls calls to the TaskObserver will come from tasks that had already been submitted and were too far along to comply with a cancellation request.

public void complete(boolean result)
{
  if (result &&
      latch_.compareAndSet(false, true))
  {
    // Set a flag to cease submitting new tasks.
    service_.shutdownNow();
    if (!service_.awaitTermination(timeoutMagnitude, timeoutUnit))
    {
      // Report a problem in shutting down the pool in a timely manner.
    }
  }
}

If that's not enough of a push to get you started, please follow up with additional questions.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜