开发者

Java blocking queue containing only unique elements

sort of like a "blocking set". How can I implement a blocking queue where adding a member that is already in the开发者_运维技巧 set is ignored?


I wrote this class to solve a similar problem:

/**
 * Linked blocking queue with {@link #add(Object)} method, which adds only element, that is not already in the queue.
 */
public class SetBlockingQueue<T> extends LinkedBlockingQueue<T> {

    private Set<T> set = Collections.newSetFromMap(new ConcurrentHashMap<>());

    /**
     * Add only element, that is not already enqueued.
     * The method is synchronized, so that the duplicate elements can't get in during race condition.
     * @param t object to put in
     * @return true, if the queue was changed, false otherwise
     */
    @Override
    public synchronized boolean add(T t) {
        if (set.contains(t)) {
            return false;
        } else {
            set.add(t);
            return super.add(t);
        }
    }

    /**
     * Takes the element from the queue.
     * Note that no synchronization with {@link #add(Object)} is here, as we don't care about the element staying in the set longer needed.
     * @return taken element
     * @throws InterruptedException
     */
    @Override
    public T take() throws InterruptedException {
        T t = super.take();
        set.remove(t);
        return t;
    }
}


You can create a new class that composes a BlockingQueue, a Set, and a lock. When you put() you test against the set while holding a lock that prevents get() from running. When you get() you remove the item from the set so that it can be put() again in the future.


A blocking queue implementation backed by a linked hash set for predictable iteration order and constant time addition, removal and contains operations:

There you go.


You can override add and put methods of any implementation of BlockingQueue<T> to check first if the element is already within the queue, e.g.

@Override
public boolean add(T elem) {
    if (contains(elem))
        return true;
    return super.add(elem);
}


class BlockingSet extends ArrayBlockingQueue<E> {
   /*Retain all other methods except put*/
   public void put(E o) throws InterruptedException {
      if (!this.contains(o)){
         super.put(o);
      }
   }
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜