Two-way communication with a Java thread
In my application I'm performing somewhat heavy lookup operations. These operations must be done within a single thread (persistence framework limitation).
I want to cache the results. Thus, I have a class UMRCache, with an inner class Worker:
public class UMRCache {
private Worker worker;
private List<String> requests = Collections.synchronizedList<new ArrayList<String>>());
private Map<String, Object> cache = Collections.synchronizedMap(new HashMap<String, Object>());
public UMRCache(Repository repository) {
this.worker = new Worker(repository);
this.worker.start();
}
public Object get(String key) {
if (this.cache.containsKey(key)) {
// If the element is already cached, get value from cache
return this.cache.get(key);
}
synchronized (this.requests) {
// Add request to queue
this.requests.add(key);
// Notify the Worker thread that there's work to do
this.requests.notifyAll();
}
synchronized (this.cache) {
// Wait until Worker has updated the cache
this.cache.wait();
// Now, cache should contain a value for key
return this.cache.get(key);
}
}
private class Worker extends Thread {
public void run() {
boolean doRun = true;
while (doRun) {
synchronized (requests) {
while (requests.isEmpty() && doRun) {
requests.wait(); // Wait until there's work to do
}
synchronized (cache) {
Set<String> processed = new HashSet<String>();
for (String key : requests) {
// Do the lookup
Object result = respository.lookup(key);
// Save to cache
cache.put(key, result);
processed.add(key);
}
// Remove processed requests from queue
requests.removeAll(processed);
// Notify all threads waiting for their requests to be served
cache.notifyAll();
}
}
}
}
}
}
I have a testcase for this: public class UMRCacheTest extends TestCase { private UMRCache umrCache;
public void setUp() throws Exception {
super.setUp();
umrCache = new UMRCache(repository);
}
public void testGet() throws Exception {
for (int i = 0; i < 10000; i++) {
final List fetched = Collections.synchronizedList(new ArrayList());
final String[] keys = new String[]{"key1", "key2"};
final String[] expected = new String[]{"result1", "result2"}
final Random random = new Random();
Runnable run1 = new Runnable() {
public void run() {
for (int i = 0; i < keys.length; i++) {
final String key = keys[i];
final Object result = umrCache.get(key);
assertEquals(key, results[i]);
fetched.add(um);
try {
Thread.sleep(random.nextInt(3));
} catch (InterruptedException ignore) {
}
}
}
};
Runnable run2 = new Runnable() {
public void run() {
for (int i = keys.length - 1; i >= 0; i--) {
final String key = keys[i];
final String result = umrCache.get(key);
assertEquals(key, results[i]);
fetched.add(um);
try {
Thread.sleep(r开发者_JS百科andom.nextInt(3));
} catch (InterruptedException ignore) {
}
}
}
};
final Thread thread1 = new Thread(run1);
thread1.start();
final Thread thread2 = new Thread(run2);
thread2.start();
final Thread thread3 = new Thread(run1);
thread3.start();
thread1.join();
thread2.join();
thread3.join();
umrCache.dispose();
assertEquals(6, fetched.size());
}
}
}
The test fails randomly, at about 1 out of 10 runs. It will fail at the last assertion: assertEquals(6, fetched.size()), at assertEquals(key, results[i]), or sometimes the test runner will never finish.
So there's something buggy about my thread logic. Any tips?
EDIT:
I might have cracked it now, thanks to all who have helped. The solution seems to be:
public Object get(String key) {
if (this.cache.containsKey(key)) {
// If the element is already cached, get value from cache
return this.cache.get(key);
}
synchronized (this.requests) {
// Add request to queue
this.requests.add(key);
// Notify the Worker thread that there's work to do
this.requests.notifyAll();
}
synchronized (this.cache) {
// Wait until Worker has updated the cache
while (!this.cache.containsKey(key)) {
this.cache.wait();
}
// Now, cache should contain a value for key
return this.cache.get(key);
}
}
get() method logic can miss result and get stuck
synchronized (this.requests) {
// Add request to queue
this.requests.add(key);
// Notify the Worker thread that there's work to do
this.requests.notifyAll();
}
// ----- MOMENT1. If at this moment Worker puts result into cache it
// will be missed since notification will be lost
synchronized (this.cache) {
// Wait until Worker has updated the cache
this.cache.wait();
// ----- MOMENT2. May be too late, since cache notifiation happened before at MOMENT1
// Now, cache should contain a value for key
return this.cache.get(key);
}
The variable fetched in your test is an ArrayList and is accessed and updated from your two anonymous Runnable instances.
ArrayList is not thread safe, from the documentation:
Note that this implementation is not synchronized. If multiple threads access an ArrayList instance concurrently, and at least one of the threads modifies the list structurally, it must be synchronized externally. (A structural modification is any operation that adds or deletes one or more elements, or explicitly resizes the backing array; merely setting the value of an element is not a structural modification.) This is typically accomplished by synchronizing on some object that naturally encapsulates the list. If no such object exists, the list should be "wrapped" using the Collections.synchronizedList method. This is best done at creation time, to prevent accidental unsynchronized access to the list:
Hence I think your test needs a little adjusting.
I noticed your lookup in cache isn't atomic operation:
if (this.cache.containsKey(key)) {
// If the element is already cached, get value from cache
return this.cache.get(key);
}
Since you never delete from cache in your code, you always will get some value by this code. But if, in future, you plan to clean cache, lack of atomicity here will become a problem.
精彩评论