开发者

Java: notify main class when all threads in threadpool are finished / same instance of object in different threads

How do I notify my main class which instantiates a ThreadPoolExecutor when all threads within the ThreadPoolExecutor are completed?

ThreadPoolExecutor threadPool = null;
ThreadClass threadclass1;
ThreadClass threadclass2;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(maxPoolSize);

puclic MyClass(){
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);

        threadClass1 = new ThreadClass;
        threadClass2 = new ThreadClass;

        threadPool.execute(threadClass1);
        threadPool.execute(threadClass2);

        //Now I would like to do something until the threadPool is done working
        //The threads fill a ConcurrentLinkedQueueand I would like to poll
        //the queue as it gets filled by the threads and output 
        //it to XML via JAX-RS

}

EDIT 1

Wile my threads fetch data from somewhere and fill this information into a ConcurrentLinkedQueue I basically would like to perform some action in MyClass to update the XML output with the results. When all threads are terminated I would like to return true to the JAX-RS webservice which instantiated MyClass so the webservice knows all data has been fetched and it can now display the final XML file

EDIT 2

I am passing a Queue to threads so they can add items to the queue. When one driver is done adding items to the articleQueue I want to perform an action within my main class, polling the entity from the Queue and handing it over to the response object to display it in some way.

When I pass the queue to the threads, are they working with the same object or with a "copy" of the object so that changes within the thread do not effect the main object? That is not the behavior I want. When I check the size of the articleQueue within the Driver it is 18, the size of the articleQueue in the DriverController is 0.

Is there a nicer way to react when a thread has added something to the queue other than my while loop? How do I have to modify my code to acces the same object within different classes?

DriverController

public class DriverController {

    Queue<Article> articleQueue;

    ThreadPoolExecutor threadPool = null;
    final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Run开发者_Python百科nable>(
            maxPoolSize);

    public DriverController(Response response) {

        articleQueue = new ConcurrentLinkedQueue<Article>();
        threadPool = new ThreadPoolExecutor();
        Driver driver = new Driver(this.articleQueue);

        threadPool.execute(driver);
        // More drivers would be executed here which add to the queue

        while (threadPool.getActiveCount() > 0) {
            // this.articleQueue.size() gives back 0 here ... why?
            if(articleQueue.size()>0){
                response.addArticle(articleQueue.poll());
            }
        }

    }
}

Driver

public class Driver implements Runnable{

    private Queue<Article> articleQueue;

    public DriverAlliedElectronics(Queue articleQueue) {
        this.articleQueue = articleQueue;
    }

    public boolean getData() {
        // Here would be the code where the article is created ...

        this.articleQueue.offer(article);
        return true;
    }

    public void run() {
        this.getData();
        // this.articleQueue.size() gives back 18 here ...

    }
}


You should try to use following snippet

//Now I would like to wait until the threadPool is done working
threadPool.shutdown();
while (!threadPool.isTerminated()) {
    try {
        threadPool.awaitTermination(10, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}


Maybe a ExecutorCompletionService might be the right thing for you:

http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ExecutorCompletionService.html

Example from the link above:

void solve(Executor e, Collection<Callable<Result>> solvers)
  throws InterruptedException, ExecutionException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    for (Callable<Result> s : solvers)
        ecs.submit(s);
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
        Result r = ecs.take().get();
        if (r != null) 
            use(r);
    }
}


Instead of using execute you should use submit. This will return a Future instance on which you can wait for the task(s) to complete. That way you don't need polling or shutting down the pool.


I don't think there's a way to do this explicitly. You could poll the getCompletedTaskCount() to wait for that to become zero.

Why not collect the Future objects returned upon submission and check for all of those being completed ? Simply call get() on each one in turn. Since that call blocks you'll simply wait for each in turn and gradually fall through the set until you've waited on each on.

Alternatively you could submit the threads, and call shutdown() on the executor. That way, the submitted tasks will be executed, and then the terminated() method is called. If you override this then you'll get a callback once all tasks are completed (you couldn't use that executor again, obviously).


Judging from the reference documentation you have a few options:

ThreadPoolExecutor threadPool = null;
ThreadClass threadclass1;
ThreadClass threadclass2;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(maxPoolSize);

puclic MyClass(){
    threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);

    threadClass1 = new ThreadClass;
    threadClass2 = new ThreadClass;

    threadPool.execute(threadClass1);
    threadPool.execute(threadClass2);

    //Now I would like to wait until the threadPool is done working

    //Option 1:  shutdown() and awaitTermination()
    threadPool.shutDown();
    try {
        threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }

    //Option 2:  getActiveCount()
    while (threadPool.getActiveCount() > 0) {
        try {
            Thread.sleep(1000);
        }
        catch (InterruptedException ignored) {}
    }

    //Option 3:  getCompletedTaskCount()
    while (threadPool.getCompletedTaskCount() < totalNumTasks) {
        try {
            Thread.sleep(1000);
        }
        catch (InterruptedException ignored) {}
    }

}

All things considered, I think shutdown() and awaitTermination() is the best option of the three.


I think you're overengineering things a bit. You don't really care about the threads or the thread pool, and rightly so. Java provides nice abstractions so that you don't have to. You just need to know when your tasks are complete, and methods exist for that. Just submit your jobs, and wait for the futures to say they're done. If you really want to know as soon as a single task completes, you can watch all the futures and take action as soon as any one is finished. If not and you only care that everything is finished, you can remove some complexity from the code I'm about to post. Try this on for size (note MultithreadedJaxrsResource is executable):

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import java.util.*;
import java.util.concurrent.*;

@Path("foo")
public class MultithreadedJaxrsResource {
    private ExecutorService executorService;

    public MultithreadedJaxrsResource(ExecutorService executorService) {
        this.executorService = executorService;
    }

    @GET
    @Produces(MediaType.APPLICATION_XML)
    public AllMyArticles getStuff() {
        List<Future<Article>> futures = new ArrayList<Future<Article>>();
        // Submit all the tasks to run
        for (int i = 0; i < 10; i++) {
            futures.add(executorService.submit(new Driver(i + 1)));
        }
        AllMyArticles articles = new AllMyArticles();
        // Wait for all tasks to finish
        // If you only care that everything is done and not about seeing
        // when each one finishes, this outer do/while can go away, and
        // you only need a single for loop to wait on each future.
        boolean allDone;
        do {
            allDone = true;
            Iterator<Future<Article>> futureIterator = futures.iterator();
            while (futureIterator.hasNext()) {
                Future<Article> future =  futureIterator.next();
                if (future.isDone()) {
                    try {
                        articles.articles.add(future.get());
                        futureIterator.remove();
                    } catch (InterruptedException e) {
                        // thread was interrupted. don't do that.
                        throw new IllegalStateException("broken", e);
                    } catch (ExecutionException e) {
                        // execution of the Callable failed with an
                        // exception. check it out.
                        throw new IllegalStateException("broken", e);
                    }
                } else {
                    allDone = false;
                }
            }
        } while (!allDone);
        return articles;
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        AllMyArticles stuff =
            new MultithreadedJaxrsResource(executorService).getStuff();
        System.out.println(stuff.articles);
        executorService.shutdown();
    }
}

class Driver implements Callable<Article> {
    private int i; // Just to differentiate the instances

    public Driver(int i) {
        this.i = i;
    }

    public Article call() {
        // Simulate taking some time for each call
        try {
            Thread.sleep(1000 / i);
        } catch (InterruptedException e) {
            System.err.println("oops");
        }
        return new Article(i);
    }
}

class AllMyArticles {
    public final List<Article> articles = new ArrayList<Article>();
}

class Article {
    public final int i;

    public Article(int i) {
        this.i = i;
    }

    @Override
    public String toString() {
        return "Article{" +
                       "i=" + i +
                       '}';
    }
}

Done that way, you can plainly see that the tasks are returned in the order they complete, as the last task finishes first thanks to sleeping the shortest time. If you don't care about completion order and just want to wait for all to finish, the loop becomes much simpler:

for (Future<Article> future : futures) {
    try {
        articles.articles.add(future.get());
    } catch (InterruptedException e) {
        // thread was interrupted. don't do that.
        throw new IllegalStateException("broken", e);
    } catch (ExecutionException e) {
        // execution of the Callable failed with an exception. check it out.
        throw new IllegalStateException("broken", e);
    }
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜