开发者

Is there any (unbounded) fair blocking queue in java?

开发者_StackOverflow社区

Is there any implementation of blocking queue which guarantees fair take() operation if multiple consumers are removing element from the same queue. I checked LinkedBlockingQueue, LinkedTransferQueue and looks like both of them are unfair. ArrayBlockingQueue provides fair operation but its bounded.


We can implement an unbounded fair blocking queue using an unbounded queue like ConcurrentLinked queue and a fair Semaphore. The class below doesn't implement all methods from the BlockingQueue interface but just a few of them for demonstration purposes. The main() method is written as a test only.

public class FairBlockingQueue<T> {

    private final Queue<T> queue;
    private final Semaphore takeSemaphore;

    public FairBlockingQueue() {
        queue = new ConcurrentLinkedQueue<T>();
        takeSemaphore = new Semaphore(0, true);
    }

    public FairBlockingQueue(Collection<T> c) {
        queue = new ConcurrentLinkedQueue<T>(c);
        takeSemaphore = new Semaphore(c.size(), true);
    }

    public T poll() {
        if (!takeSemaphore.tryAcquire()) {
            return null;
        }
        return queue.poll();
    }

    public T poll(long millis) throws InterruptedException {
        if (!takeSemaphore.tryAcquire(millis, TimeUnit.MILLISECONDS)) {
            return null;
        }
        return queue.poll();
    }

    public T take() throws InterruptedException {
        takeSemaphore.acquire();
        return queue.poll();
    }

    public void add(T t) {
        queue.add(t);
        takeSemaphore.release();
    }

    public static void main(String[] args) throws Exception {
        FairBlockingQueue<Object> q = new FairBlockingQueue<Object>();
        Object o = q.poll();
        assert o == null;
        o = q.poll(1000);
        assert o == null;

        q.add(new Object());
        q.add(new Object());
        q.add(new Object());

        o = q.take();
        assert o != null;
        o = q.poll();
        assert o != null;
        o = q.poll(1000);
        assert o != null;

        o = q.poll();
        assert o == null;
    }
}


PriorityBlockingQueue


Fairness policy may be specified for SynchronousQueue:

a queue constructed with fairness set to true grants threads access in FIFO order

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜