开发者

How do you deal with index overflow when writing a thread-safe blocking bounded buffer structure in java?

UPDATED: I've updated the code based on the correct answer below. This works but creates a new problem (I will post a new question).

Creating a blocking bounded buffer class using semaphore's with multiple producers and consumers.

Goal is to use atomic integer as the pointers so I don't have to synchronize internally. Overflow handling has been corrected to use CAS now.

But this doesn't work unless I use synchronized around the AtomicInteger pointers (see comment below). Not sure why? Scroll all the way below to see what I mean by "missing entries"...

public class BoundedBuffer<T> {
private static final int BUFFER_SIZE = Short.MAX_VALUE+1;
private AtomicReferenceArray<T> m_buffer = null;
private Semaphore m_full = new Semaphore(BUFFER_SIZE);
private Semaphore m_empty = new Semaphore(0);
private AtomicInteger m_writePointer = new AtomicInteger();
private AtomicInteger m_readPointer = new AtomicInteger();

public BoundedBuffer() {
    m_buffer = new AtomicReferenceArray<T>(BUFFER_SIZE);
}

public static int safeGetAndIncrement(AtomicInteger i) {
    int oldValue = 0, newValue = 0;
    do {
        oldValue = i.get();
        newValue = (oldValue == Short.MAX_VALUE) ? 0 : (oldValue + 1);
    } while (!i.compareAndSet(oldValue, newValue));
    return oldValue;
}

public void add(T data) throws InterruptedException {
    m_full.acquire();
    synchronized (this) { // << Commenting this doesn't work
        // CAS-based overflow handling
        m_buffer.set(safeGetAndIncrement(m_writePointer),data);
    }
    m_empty.release();
}

public T get() throws InterruptedException {
    T data = null;
    m_empty.acquire();
    synchronized (this) { // << Commenting this doesn't work
        // CAS-based overflow handling
        data = m_buffer.get(safeGetAndIncrement(m_readPointer));
    }
    m_full.release();
    return data;
}
}

Test program has...

8 producer threads, each putting about 10000 entries into the queue.开发者_如何学运维 Each entry is a string of the format: A":"B where A is numbers 0..7 for the 8 threads. B is just a numerically increasing count from 0..9999

4 consumer threads, consuming everything, until null is hit.

Once producer threads have completed adding everything into the buffer, 4 null's are added to the queue (to stop the consumers).

Output of the threads...

P:Data, 1:9654@1 P:Data, 5:1097@347 C:Data, 1:9654@1 P:Data, 4:5538@1 C:Data, 4:5538@1 C:Data, null@14466

Verification When verifying if all entries that were produced were consumer, a few appear missing (just before the arrayindexoutofbounds is hit (may just be a coincidence).

Verifying... Missing 4:5537 Missing 5:1096 Verified


You need to handle overflows when incrementing the counter. For example, you can use the following method instead of getAndIncrement():

public static int safeGetAndIncrement(AtomicInteger i) {
    int oldValue = 0;
    do {
        oldValue = i.get();
        int newValue = (oldValue == MAX_VALUE) ? 0 : (oldValue + 1);
    } while (!i.compareAndSet(oldValue, newValue));
    return oldValue;
}

It uses the typical compare-and-swap approach and shouldn't harm performance since getAndIncrement() is internally implemented in the same way.

Also, if MAX_VALUE is BUFFER_SIZE you don't need % BUFFER_SIZE operation.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜