java thread synchronization problem, how to implements observable thread
The problem is: class C controls class B in terms of whether or not to let B add the A to the queueOfA back again, but how can I make sure that when A notifies C and update immediately before B's queueOfA becomes empty, because class B is running so fast that may remove all the A's so becomes an empty queueOfA before C doing the update, which may leads to C's run method go to the end.
Please help me out!
class A extends Observable implements Runnable{
//...
void run(){
//...
if ( goBackToTheQueueAgain == true ){
setChanged();
notifyObservers();//notify C that to let B add itself back
}
//...
}
class B extends implements Runnable{
Queue<A> queueOfA;// initialy, queueOfA contains several A's
//...
void run(){
//...
A retrieved = queueOFA.remove();
Thread aThread = new Thread(retrieved);
aThread.start();
//...
}
}
class C implements Observer, Runnable {
B b; //initially class C contains an instance of B
//...
void update(Observable 开发者_运维问答o, Object arg){
if(o instanceof A){
A a = (A) o;
b.queueOfA.add(a);
}
}
//...
void run(){
//...
Thread bThread = new Thread(b);
bThread.start();
while ( b.queueOfA.isEmpty() == false ){
// if queueOfA is not empty loop for ever
}
//..
}
}
The problem is not that you have to notify the update as soon as possible - you cannot guarantee that. For example, consider that you have just one element in the queue to start with. That gets removed from the queue and then the queue is empty, yet the element is still being processed. The A thread can take as long as it likes - C should not terminate, even though the queue is empty.
The fix is that C should wait until all A's have been fully processed. That is, the A's reached the end of their run method without putting the A back on the queue. This can be done using a countdown latch. Initially set the latch to the size of the queue (the number of A's) and decrement this latch each time an A is fully processed. C will exit only when this latch becomes zero.
C looks something like this
CountdownLatch latch;
void run(){
//...
this.latch = new CountDownLatch(queueOfA.size());
Thread bThread = new Thread(b);
bThread.start();
latch.await();
//.. catch InterruptedException etc..
}
void notifyDone(A a) {
this.latch.countDown();
}
And A's run method is
void run(){
//...
C c = ...; // passed in from somewhere
if ( goBackToTheQueueAgain == true ){
setChanged();
notifyObservers();//notify C that to let B add itself back
}
else
c.notifyDone(this);
}
A needs a reference to C, so that it can notify directly that it is done.
Personally, I would not use the Observer/Observable here, since your notification has a specific meaning - that the item can be requeued. The observer pattern is best suited to notify in changes of state, where that state is available from the instance being observed. that's not the case here, and there is no real gain from such abstract decoupling. If you want to keep it abstract and not couple A to C, you can introduce an additional interface, e.g.
interface QueueHandler<T>
{
void requeue(T t);
void completed(T t);
}
and implement this on C, and pass it to each A instead of the current Observer interface. But equally, the A and C classes are quite tightly coupled, so you could also just pass C into A and let it notify directly.
I answered this question with a working code example - It is very close. All you would need to do is add an Observer to the Thread that has the main.
Image processing in a multhithreaded mode using Java
精彩评论