How do you deal with index overflow when writing a thread-safe blocking bounded buffer structure in java?
UPDATED: I've updated the code based on the correct answer below. This works but creates a new problem (I will post a new question).
Creating a blocking bounded buffer class using semaphore's with multiple producers and consumers.
Goal is to use atomic integer as the pointers so I don't have to synchronize internally. Overflow handling has been corrected to use CAS now.
But this doesn't work unless I use synchronized around the AtomicInteger pointers (see comment below). Not sure why? Scroll all the way below to see what I mean by "missing entries"...
public class BoundedBuffer<T> {
private static final int BUFFER_SIZE = Short.MAX_VALUE+1;
private AtomicReferenceArray<T> m_buffer = null;
private Semaphore m_full = new Semaphore(BUFFER_SIZE);
private Semaphore m_empty = new Semaphore(0);
private AtomicInteger m_writePointer = new AtomicInteger();
private AtomicInteger m_readPointer = new AtomicInteger();
public BoundedBuffer() {
m_buffer = new AtomicReferenceArray<T>(BUFFER_SIZE);
}
public static int safeGetAndIncrement(AtomicInteger i) {
int oldValue = 0, newValue = 0;
do {
oldValue = i.get();
newValue = (oldValue == Short.MAX_VALUE) ? 0 : (oldValue + 1);
} while (!i.compareAndSet(oldValue, newValue));
return oldValue;
}
public void add(T data) throws InterruptedException {
m_full.acquire();
synchronized (this) { // << Commenting this doesn't work
// CAS-based overflow handling
m_buffer.set(safeGetAndIncrement(m_writePointer),data);
}
m_empty.release();
}
public T get() throws InterruptedException {
T data = null;
m_empty.acquire();
synchronized (this) { // << Commenting this doesn't work
// CAS-based overflow handling
data = m_buffer.get(safeGetAndIncrement(m_readPointer));
}
m_full.release();
return data;
}
}
Test program has...
8 producer threads, each putting about 10000 entries into the queue.开发者_如何学运维 Each entry is a string of the format: A":"B where A is numbers 0..7 for the 8 threads. B is just a numerically increasing count from 0..9999
4 consumer threads, consuming everything, until null is hit.
Once producer threads have completed adding everything into the buffer, 4 null's are added to the queue (to stop the consumers).
Output of the threads...
P:Data, 1:9654@1 P:Data, 5:1097@347 C:Data, 1:9654@1 P:Data, 4:5538@1 C:Data, 4:5538@1 C:Data, null@14466
Verification When verifying if all entries that were produced were consumer, a few appear missing (just before the arrayindexoutofbounds is hit (may just be a coincidence).
Verifying... Missing 4:5537 Missing 5:1096 Verified
You need to handle overflows when incrementing the counter. For example, you can use the following method instead of getAndIncrement()
:
public static int safeGetAndIncrement(AtomicInteger i) {
int oldValue = 0;
do {
oldValue = i.get();
int newValue = (oldValue == MAX_VALUE) ? 0 : (oldValue + 1);
} while (!i.compareAndSet(oldValue, newValue));
return oldValue;
}
It uses the typical compare-and-swap approach and shouldn't harm performance since getAndIncrement()
is internally implemented in the same way.
Also, if MAX_VALUE
is BUFFER_SIZE
you don't need % BUFFER_SIZE
operation.
精彩评论