开发者

How can I make a Java PriorityBlockingQueue that preserves FIFO behavior?

I'm trying to create a Priority Blocking Queue in Java that maintains FIFO order for elements with the same priority. The Oracle doc provides some help with that, but I'm still very tangled up.

I should note that the following topics are all very new to me: Generics, Interfaces as Types, and static nested classes. All of these come into play in the following class definition. Generics, especially, are confusing, and I'm sure I've totally messed up with them here.

I have included comments to identify the compiler errors I am currently getting.

Several specific questions:

  1. Is it okay to have the class represent the queued event object, with the actual queue being a static class member?

  2. Was it reasonable to have included Oracle's FIFO event "wrapper" as a static nested class?

  3. Am I at least on the right track here, doing it all in one outer class?

Here is the class that I've written:

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class FIFOPBQEvent {

/**
 * First we define a static nested class which, when instantiated,
 * encapsulates the "guts" of the event - a FIFOPBQEvent - along with
 * a sequence number that assures FIFO behavior of events of like priority.
 *  
 * The following is lifted ALMOST verbatim (I added "static" and some 
 * comments) from Oracle documentation on adding FIFO-ness to a Priority
 * Blocking Queue: 
 * http://download.oracle.com/javase/6/docs/api/java/util/concurrent/PriorityBlockingQueue.html
 * As the Oracle doc points out:
 * 
 * "A static nested class interacts with the instance members of its outer 
 * class (and other classes) just like any other top-level class. In 
 * effect, a static nested class is behaviorally a top-level class that 
 * has been nested in another top-level class for packaging convenience."
 *
 */
static class FIFOEntry<E extends Comparable<? super E>> implements
        Comparable<FIFOEntry<E>> {
    final static AtomicLong seq = new AtomicLong();
    final long seqNum;  // instance
    final E entry;

    public FIFOEntry(E entry) {
        seqNum = seq.getAndIncrement();
        this.entry = entry;
    }

    public E getEntry() {
        return entry;
    }
    /** Here is implementation of Comparable */
    public int compareTo(FIFOEntry<E> other) {
        int res = entry.compareTo(other.entry);
        if (res == 0 && other.entry != this.entry)
            res = (seqNum < other.seqNum ? -1 : 1);
        return res;
    }
}

/**
 * Now we declare a single (static) PBQ of FIFO entries into which 
 * PBQFIFOEvents will be added and removed.
 */

/** FLAGGED AS ERROR BY COMPILER */
// Bound mismatch: The type FIFOPBQEvent is not a valid substitute for the
// bounded parameter <E extends Comparable<? super E>> of the type 
// FIFOPBQEvent.FIFOEntry<E>

private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue =
    PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>();

/** 
 * And here are the "guts" of our event: the i.d. and state of the GUI widget 
 */
private ConsoleObject obj = ConsoleObject.UNDEFINED_OBJ; // widget that was affected
private ObjectState state = ObjectState.UNDEFINED_STATE; // the widget's new state

/** 
 * Constructor specifying the class variables 
 */
public FIFOPBQEvent(ConsoleObject theObj, ObjectState theState) {
    obj = theObj;
    state = theState;
}

/**
 * Event queuing ("sending") and dequeuing ("receiving")
 */
public void sendEvent() {

    /** FLAGGED AS ERROR BY COMPILER */
    // The method put(FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>) in the type 
    // PriorityBlockingQueue<FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>> is not 
    // applicable for the arguments (FIFOPBQEvent)

    theQueue.put(this);
}

public static FIFOPBQEvent receiveEvent() {

    /** FLAGGED AS ERROR BY COMPILER */
    // Type mismatch: cannot convert from FIFOPBQEvent.FIFOEntry<FIFOPBQEvent> 
    // to FIFOPBQEvent

    FIFOPBQEvent event = theQueue.take();
    return event;
}

/**
 * ConsoleEvent accessors
 */
public ConsoleObject getObj() {
    return this.obj;
}

public ObjectState getState() {
    r开发者_运维知识库eturn this.state;
}

/**
 * And for the first time, enums instead of public static final ints.
 */
public enum ConsoleObject {
    UNDEFINED_OBJ,
    RESERVED,

    /** Console keys */
    RESET,
    DISPLAY_MAR,
    SAVE,
    INSERT,
    RELEASE,
    START,
    SIE,
    SCE,

    /** Console toggle switches */
    POWER,
    PARITY_CHECK,
    IO_CHECK,
    OVERFLOW_CHECK,
    SENSE_SWITCH_1,
    SENSE_SWITCH_2,
    SENSE_SWITCH_3,
    SENSE_SWITCH_4
}

public enum ObjectState {
    UNDEFINED_STATE,

    /** Toggle switches */
    OFF,
    ON,

    /** Console keys */
    PRESSED,
}
}


The first error is the more significant error. It occurs because the FIFOPBQEvent class doesn't implement Comparable, which it must to be considered as the generic type for the FIFOEntry nested class. This is because you restrict E and say that it extends Comparable<...>. Basically, your FIFOPBQEvent class must be comparable to provide the priority for the queue (presumably based on the event type).

To fix the error, you need to:

  1. Change the header of your class to:

    public class FIFOPBQEvent implements Comparable<FIFOPBQEvent> {
    
  2. add a compareTo method in the FIFOPBQEvent class; something like:

    public int compareTo (FIFOPBQEvent other) {
        // TODO - compare this and other for priority
        return 0;
    }
    

Then you need to wrap your entry in your sendEvent method:

public void sendEvent () {
    theQueue.put(new FIFOEntry<FIFOPBQEvent> (this));
}

The last, minor, error is simply that you aren't unwrapping the FIFOEntry object. To fix this, change receiveEvent to:

public static FIFOPBQEvent receiveEvent () {
    FIFOPBQEvent event = theQueue.take ().getEntry ();
    return event;
}


Let's step through your code.

static class FIFOEntry<E extends Comparable<? super E>> implements 
  Comparable<FIFOEntry<E>> {

This defines the class FIFOEntry which takes a generic parameter. You have constrained the type of generic parameter to "Any object that implements Comparable of itself".

private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue =
  PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>();

Your declaration of PriorityBlockingQueue is not incorrect here, but your definition of FIFOEntry<FIFOPBQEvent> is incorrect. This is because of the above point - you have restricted the type of FIFOEntry to anything that implements Comparable of itself i.e. it should be

class FIFOPBQEvent implements Comparable<FIFOPBQEvent> {

Your next problem is -

public void sendEvent() {
  theQueue.put(this);
}

The type of this is FIFOPBQEvent but the queue only accepts FIFOEntry objects. To match your Queue signature it should be:

public void sendEvent() {
  // Requires FIFOPBQEvent to be Comparable
  theQueue.put(new FIFOEntry<FIFOPBQEvent>(this));     
}

You have the same problem on receiveEvent() too - your Queue signature says that the Queue contains FIFOEntry objects and you are trying to pull out FIFOPBQEvents.


Taking @101100's recommendation, I have reworked the design, decoupling the queue from the events. That seems to make it much simpler and easier to understand (and reuse), but sadly I'm still unclear on some concepts. What follows here is the PriorityFIFOEventQueue (I've omitted the Event class for brevity). And I've noted where I still need some help:

package ibm1620;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
 * This class represents a Priority Blocking Queue of FIFO entries.  Instantiating
 * it creates a queue.  It provides instance methods for sending and receiving
 * entries, a.k.a. events of type E, on the queue.
 */

The following is flagged with diagnostic: "The type PriorityFIFOEventQueue must implement the inherited abstract method Comparable>.compareTo (PriorityFIFOEventQueue)"

I'm pretty sure I don't want to compare queues !! Still not sure what I need here.

public class PriorityFIFOEventQueue<E extends Comparable<? super E>> implements Comparable<PriorityFIFOEventQueue<E>> {

/**
 * FIFOEntry is a static nested class that wraps an Entry and provides support for
 * keeping the Entries in FIFO sequence.
 */
private static class FIFOEntry<E> implements Comparable<FIFOEntry<E>> {

    /**
     * There's going to be one atomic seq per ... queue?  runtime?
     */

    final static AtomicLong seq = new AtomicLong();

    /**
     * FIFOEntry is simply an entry of type E, and a sequence number
     */
    final long seqNum; 
    final E    entry;

    /**
     * FIFOEntry() constructor
     */
    FIFOEntry(E entry) {
        seqNum = seq.getAndIncrement();
        this.entry = entry;
    }

    /**
     * Accessor method for entry
     */
    E getEntry() {
        return entry;
    }

    /**
     * Here is implementation of Comparable that is called directly by PBQ.
     * We first invoke E's comparator which compares based on priority. 
     * If equal priority, order by sequence number (i.e. maintain FIFO).
     * */

In the following, the line containing "compareTo" is flagged, and the diagnostic is "The method compareTo(E) is undefined for the type E". Apparently I haven't told the compiler that the "other" FIFOEntry implements Comparable.

    public int compareTo(FIFOEntry<E> other) {
        int res = entry.compareTo(other.entry); // priority-based compare
        if (res == 0 && other.entry != this.entry)
            res = (seqNum < other.seqNum ? -1 : 1); // FIFO compare
        return res;
    }
}

/**
 * Here is the PriorityBlockingQueue of FIFOEntries
 */

private PriorityBlockingQueue<FIFOEntry<E>> theQueue = 
    new PriorityBlockingQueue<FIFOEntry<E>>();

/**
 * Event queuing ("sending") and dequeuing ("receiving")
 */
public void sendEvent(E event) {
    theQueue.put(new FIFOEntry<E>(event));
}

/**
 * pollEvent() 
 * Will remove and return a ConsoleEvent from the queue, or return
 * null if queue is empty.
 */
public E pollEvent() {
    E event = null;
    FIFOEntry<E> aFIFOEvent = theQueue.poll();
    if (aFIFOEvent != null) {
        event = aFIFOEvent.getEntry();
        say("pollEvent() -> "+event);
    }
    else {
    }
    return event;
}

/**
 * receiveEvent() 
 * Will block if queue is empty.  Takes a FIFOEvent off the queue,
 * unwraps it and returns the 'E' payload.
 * 
 * @return
 */
public E receiveEvent() {
    E event = null;
    try {
        FIFOEntry<E> aFIFOEvent = theQueue.take();
        if (aFIFOEvent != null) {
            event = aFIFOEvent.getEntry();
            say("receiveEvent() -> "+event);
        }

    } catch (InterruptedException e) {
        say("InterruptedException in receiveEvent()");
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return event;
}

// for console debugging
static void say(String s) {
    System.out.println("ConsoleEvent: " + s);
}

}


Here is an actual replacement for PriorityBlockingQueue which maintains FIFO ordering for items with equal priority. It does all the wrapping/unwrapping transparently for the user.

This code was written for a 1.4 JVM and usese the j.u.c. backport. Using it in a newer JVM and adding generics should be straightforward.

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;

import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.PriorityBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;

/**
 * A queue with all properties of a {@link PriorityBlockingQueue} but which additionally 
 * returns items with equal priority 
 * in a FIFO order.
 * In this respect, {@link PriorityBlockingQueue} explicitly gives no return order guarantees for equal priority elements.
 * 
 * This queue only accepts {@link Comparable} items. A custom {@link Comparator} cannot
 * be specified at construction time.
 * 
 *
 */
public final class FIFOPriorityBlockingQueue implements BlockingQueue {
    private final PriorityBlockingQueue q;

    public FIFOPriorityBlockingQueue() {
        q = new PriorityBlockingQueue();
    }

    public FIFOPriorityBlockingQueue(int initialCapacity) {
        q = new PriorityBlockingQueue(initialCapacity);
    }

    public boolean isEmpty() {
        return q.isEmpty();
    }

    public boolean addAll(Collection c) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    boolean modified = false;
    Iterator e = c.iterator();
    while (e.hasNext()) {
        if (add(e.next()))
        modified = true;
    }
    return modified;
    }

    /**
     * Always returns <code>null</code> as this {@link BlockingQueue} only accepts
     * {@link Comparable} objects and doesn't allow setting an own {@link Comparator} 
     * @return
     */
    public Comparator comparator() {
        return null;
    }

    public boolean containsAll(Collection c) {
        Iterator e = c.iterator();
        while (e.hasNext())
            if(!contains(e.next()))
            return false;

        return true;
    }

    public int size() {
        return q.size();
    }

    public int remainingCapacity() {
        return q.remainingCapacity();
    }

    public boolean removeAll(Collection c) {
        boolean modified = false;
        Iterator e = iterator();
        while (e.hasNext()) {
            if(c.contains(e.next())) {
            e.remove();
            modified = true;
            }
        }
        return modified;
    }

    public boolean retainAll(Collection c) {
        boolean modified = false;
        Iterator e = iterator();
        while (e.hasNext()) {
            if(!c.contains(e.next())) {
            e.remove();
            modified = true;
            }
        }
        return modified;
    }

    public Object remove() {
        return ((FIFOEntry)q.remove()).entry;
    }

    public Object element() {
        return ((FIFOEntry)q.element()).entry;
    }

    public boolean add(Object e) {
        return q.add(new FIFOEntry((Comparable)e));
    }

    public boolean offer(Object e) {
        return q.offer(new FIFOEntry((Comparable)e));
    }

    public void put(Object e) {
        q.put(new FIFOEntry((Comparable)e));
    }

    public boolean offer(Object e, long timeout, TimeUnit unit) {
        return q.offer(new FIFOEntry((Comparable)e), timeout, unit);
    }

    public Object poll() {
        return ((FIFOEntry)q.poll()).entry;
    }

    public Object take() throws InterruptedException {
        return ((FIFOEntry)q.take()).entry;
    }

    public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
        return ((FIFOEntry)q.poll(timeout, unit)).entry;
    }

    public Object peek() {
        return ((FIFOEntry)q.peek()).entry;
    }

    /**
     * If more than one equal objects are held by the queue, remove() will
     * remove any one of them, not necessarily the first or last inserted.
     */
    public boolean remove(Object o) {
        return q.remove(new FIFOEntry((Comparable)o));
    }

    public boolean contains(Object o) {
        return q.contains(new FIFOEntry((Comparable)o));
    }

    public Object[] toArray() {
        Object[] a = q.toArray();
        for (int i = 0; i < a.length; i++) { // unpacking
            a[i] = ((FIFOEntry)a[i]).entry;
        }
        return a;
    }

    public String toString() {
        return q.toString(); // ok, because each FIFOEntry.toString returns the toString of the entry 
    }

    public int drainTo(Collection c) {
        ArrayList tmp = new ArrayList(size());
        int n = q.drainTo(tmp);
        for (Iterator it = tmp.iterator(); it.hasNext();) {
            FIFOEntry en = (FIFOEntry) it.next();
            c.add(en.entry);
        }
        return n;
    }



    public int drainTo(Collection c, int maxElements) {
        ArrayList tmp = new ArrayList(size());
        int n = q.drainTo(tmp, maxElements);
        for (Iterator it = tmp.iterator(); it.hasNext();) {
            FIFOEntry en = (FIFOEntry) it.next();
            c.add(en.entry);
        }
        return n;
    }

    public void clear() {
        q.clear();
    }



    public Object[] toArray(Object[] a) {
        Object[] res = q.toArray(a);
        for (int i = 0; i < res.length; i++) { // unpacking
            res[i] = ((FIFOEntry)res[i]).entry;
        }
        return res;
    }



    public Iterator iterator() {
        final Iterator it = q.iterator();
        return new Iterator() {
            public void remove() throws UnsupportedOperationException, IllegalStateException, ConcurrentModificationException {
                it.remove();
            }

            public Object next() throws NoSuchElementException, ConcurrentModificationException {
                return ((FIFOEntry)it.next()).entry;
            }

            public boolean hasNext() {
                return it.hasNext();
            }
        };
    }

    public int hashCode() {
        return q.hashCode();
    }

    public boolean equals(Object obj) {
        return q.equals(obj);
    }


    /**
     * Wrapping class which adds creation ordering to a {@link Comparable} object.
     * Based on the code in the javadoc for {@link PriorityBlockingQueue}
     * 
     *
     */
    private static class FIFOEntry implements Comparable {
        private final static AtomicLong seq = new AtomicLong();
        private final long seqNum;
        private final Comparable entry;
        public FIFOEntry(Comparable entry) {
            seqNum = seq.getAndIncrement();
            this.entry = entry;
        }

        public int compareTo(Object other) {
            FIFOEntry o = (FIFOEntry) other;
            int res = entry.compareTo(o.entry);
            if (res == 0 && o.entry != this.entry)
                res = (seqNum < o.seqNum ? -1 : 1);
            return res;
        }
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((entry == null) ? 0 : entry.hashCode());
            return result;
        }
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            FIFOEntry other = (FIFOEntry) obj;
            if (entry == null) {
                if (other.entry != null)
                    return false;
            } else if (!entry.equals(other.entry))
                return false;
            return true;
        }

        public String toString() {
            return entry.toString();
        }
    }

}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜