What is the best way of achieving this threading/event behaviour in Java?
I have a thread (Runnable) that starts a number of other threads (Runnables). When each child thread finishes it needs to raise an event (or something similar) and return a notification to the parent thread. I can't see any events in Java (ala C#) - I had hoped I could just subscribe in the parent to the child object's 'I'm finished event' but it doesn't appear I can do that. How do you suggest I accomplish this?
T开发者_Python百科hanks
Java has a CountDownLatch
in its threading library. Create a CountDownLatch
and initialize it with the number of threads you're going to run. When you're creating your threads you should give them the latch and each thread will signal on it when it's finished. Your main thread will block until all of the worker threads have finished.
With CountDownLatch
you will achieve a lock-free communication with your threads.
Directly from Java's documentation:
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/CountDownLatch.html
You create an interface on your parent object
public interface EventListener {
void trigger(Object event);
}
public class Parent implements EventListener {
public synchronized void trigger(Object event) {
// process events.
}
}
public class Child implements Runnable {
private final EventListener listener;
public Child(EventListener listen) {
listener = listen;
}
public void run () {
//do stuff
listener.trigger( results );
}
}
You can use a variant on the Observer pattern. Implement a callback function (such as void finished(SomeArgs args)
) in the parent, and construct each child with a reference to its parent. When the child is finished, have it call the parent's finished()
method.
Make sure the callback is thread-safe!
This doesn't use events, but is just one of I'm sure many ways to accomplish this. Quick caveat: for this to work, you would need to cast your Runnable into a Thread object, or modify your interface to have some kind of isStopped() method on your Runnable, which would return whether or not your Runnable was still running.
You could have the parent thread keep track of all of its child threads in a List. When a child thread finishes, put the value it calculated in some field, say, result, and make a method called getResult().
Have the parent thread periodically iterate through the list and check to see if the thread has stopped. If you cast your Runnable to a Thread object, there's a method called isAlive() to tell if a thread has stopped. If it has, call getResult() and do whatever.
In the parent thread, you could do this:
Boolean running = true;
while (running) {
//iterate through list
//if stopped, get value and do whatever
//if all the child threads are stopped, stop this thread and do whatever
Thread.sleep(1000); //makes this parent thread pause for 1 second before stopping again
}
The class java.util.concurrent.ThreadPoolExecutor
does what you need. It executes a number of threads and provides a hook that is called after each Runnable
is finished. Basicly you can create a annonymous subclass and override afterExecute
. Like this:
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20, 5,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50)) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
// do your callback stuff here
}
};
Here's the complete example:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
private static int ready = 0;
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20, 5,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50)) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
ready++;
}
};
for (int n = 0; n < 5; n++)
executor.execute(createTask());
executor.shutdown();
while(ready < 5) {
System.out.println("Ready: " + ready);
Thread.sleep(100);
}
System.out.println("Ready with all.");
}
private static Runnable createTask() {
return new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
// ignore exception to make debugging a little harder
}
}
};
}
}
Output is:
Ready: 0
Ready: 1
Ready: 1
Ready: 3
Ready: 3
Ready: 4
Ready: 4
Ready: 4
Ready: 4
Ready with all.
精彩评论