开发者

Two-way communication with a Java thread

In my application I'm performing somewhat heavy lookup operations. These operations must be done within a single thread (persistence framework limitation).

I want to cache the results. Thus, I have a class UMRCache, with an inner class Worker:

public class UMRCache {
  private Worker worker;
  private List<String> requests = Collections.synchronizedList<new ArrayList<String>>());
  private Map<String, Object> cache = Collections.synchronizedMap(new HashMap<String, Object>());
  public UMRCache(Repository repository) {
    this.worker = new Worker(repository);
    this.worker.start();
  }

 public Object get(String key) {
   if (this.cache.containsKey(key)) {
    // If the element is already cached, get value from cache
     return this.cache.get(key);
   }
   synchronized (this.requests) {
     // Add request to queue
     this.requests.add(key);
     // Notify the Worker thread that there's work to do
     this.requests.notifyAll();
   }
   synchronized (this.cache) {
     // Wait until Worker has updated the cache
     this.cache.wait();
    // Now, cache should contain a value for key
     return this.cache.get(key);
   }
 }

 private class Worker extends Thread {
   public void run() {
      boolean doRun = true;
      while (doRun) {
         synchronized (requests) {
            while (requests.isEmpty() && doRun) {
               requests.wait(); // Wait until there's work to do
            }
            synchronized (cache) {
               Set<String> processed = new HashSet<String>();
               for (String key : requests) {
                 // Do the lookup
                 Object result = respository.lookup(key);
                 // Save to cache
                 cache.put(key, result);
                 processed.add(key); 
               }
               // Remove processed requests from queue
               requests.removeAll(processed);
               // Notify all threads waiting for their requests to be served
               cache.notifyAll();
            } 
         }
      }
   }
}

}

I have a testcase for this: public class UMRCacheTest extends TestCase { private UMRCache umrCache;

public void setUp() throws Exception {
    super.setUp();
    umrCache = new UMRCache(repository);
}

public void testGet() throws Exception {
    for (int i = 0; i < 10000; i++) {
        final List fetched = Collections.synchronizedList(new ArrayList());
        final String[] keys = new String[]{"key1", "key2"};
        final String[] expected = new String[]{"result1", "result2"}
        final Random random = new Random();

        Runnable run1 = new Runnable() {
            public void run() {
                for (int i = 0; i < keys.length; i++) {
                    final String key = keys[i];
                    final Object result = umrCache.get(key);
                    assertEquals(key, results[i]);
                    fetched.add(um);
                    try {
                        Thread.sleep(random.nextInt(3));
                    } catch (InterruptedException ignore) {
                    }
                }
            }
        };
        Runnable run2 = new Runnable() {
            public void run() {
                for (int i = keys.length - 1; i >= 0; i--) {
                    final String key = keys[i];
                    final String result = umrCache.get(key);
                    assertEquals(key, results[i]);
                    fetched.add(um);
                    try {
                        Thread.sleep(r开发者_JS百科andom.nextInt(3));
                    } catch (InterruptedException ignore) {
                    }
                }
            }
        };

        final Thread thread1 = new Thread(run1);
        thread1.start();
        final Thread thread2 = new Thread(run2);
        thread2.start();
        final Thread thread3 = new Thread(run1);
        thread3.start();
        thread1.join();
        thread2.join();
        thread3.join();
        umrCache.dispose();
        assertEquals(6, fetched.size());
    }
}

}

The test fails randomly, at about 1 out of 10 runs. It will fail at the last assertion: assertEquals(6, fetched.size()), at assertEquals(key, results[i]), or sometimes the test runner will never finish.

So there's something buggy about my thread logic. Any tips?

EDIT:

I might have cracked it now, thanks to all who have helped. The solution seems to be:

 public Object get(String key) {
   if (this.cache.containsKey(key)) {
    // If the element is already cached, get value from cache
     return this.cache.get(key);
   }
   synchronized (this.requests) {
     // Add request to queue
     this.requests.add(key);
     // Notify the Worker thread that there's work to do
     this.requests.notifyAll();
   }
   synchronized (this.cache) {
     // Wait until Worker has updated the cache
     while (!this.cache.containsKey(key)) {
       this.cache.wait();
     }
    // Now, cache should contain a value for key
     return this.cache.get(key);
   }
 }


get() method logic can miss result and get stuck



   synchronized (this.requests) {
     // Add request to queue
     this.requests.add(key);
     // Notify the Worker thread that there's work to do
     this.requests.notifyAll();
   }

   // ----- MOMENT1.  If at this moment Worker puts result into cache it
   // will be missed since notification will be lost

   synchronized (this.cache) {
     // Wait until Worker has updated the cache
     this.cache.wait();

    // ----- MOMENT2.  May be too late, since cache notifiation happened before at MOMENT1

    // Now, cache should contain a value for key
     return this.cache.get(key);
   }


The variable fetched in your test is an ArrayList and is accessed and updated from your two anonymous Runnable instances.

ArrayList is not thread safe, from the documentation:

Note that this implementation is not synchronized. If multiple threads access an ArrayList instance concurrently, and at least one of the threads modifies the list structurally, it must be synchronized externally. (A structural modification is any operation that adds or deletes one or more elements, or explicitly resizes the backing array; merely setting the value of an element is not a structural modification.) This is typically accomplished by synchronizing on some object that naturally encapsulates the list. If no such object exists, the list should be "wrapped" using the Collections.synchronizedList method. This is best done at creation time, to prevent accidental unsynchronized access to the list:

Hence I think your test needs a little adjusting.


I noticed your lookup in cache isn't atomic operation:

if (this.cache.containsKey(key)) {
    // If the element is already cached, get value from cache
    return this.cache.get(key);
}

Since you never delete from cache in your code, you always will get some value by this code. But if, in future, you plan to clean cache, lack of atomicity here will become a problem.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜