Why can't I construct a ThreadPoolExecutor backed by a DelayQueue?
I'm trying to create a ThreadPoolExecutor:
// Thingy implements Delayed and Runnable
ExecutorService executor = new ThreadPoolExe开发者_StackOverflow中文版cutor(1, 1, 0l, TimeUnit.SECONDS, new DelayQueue<Thingy>());
The compiler is saying "cannot find symbol":
symbol : constructor ThreadPoolExecutor(int,int,long,java.util.concurrent.TimeUnit,java.util.concurrent.DelayQueue<Thingy>)
but I don't understand — DelayQueue
implements BlockingQueue
, so shouldn't I be able to use this constructor?
This is a generics problem. You can't use DelayQueue<Thingy>
, it has to be DelayQueue<Runnable>
as the ThreadPoolExecutor
constructor is not declared to accept queues of sub-types of Runnable
.
RunnableScheduledFuture
is Runnable and Delayed, but it cannot be cast to BlockingQueue<Runnable>
. See why at The Java Tutorials
Have a look to ScheduledThreadPoolExecutor
, it can schedule commands to run after a given delay, or to execute periodically.
While this is an old question I wanted to post my answer as I was searching for just this solution recently. It is possible to use a DelayQueue behind a ThreadPoolExecutor, it just takes a bit of code wrapping. The trick is to get the DelayQueue to present itself as a BlockingQueue.
I started by defining an interface DR
that extends both Runnable
and Delayed
. Note that the static methods here create instances of the DR (Instance class not shown).
public interface DR extends Delayed, Runnable {
public static DR make( Runnable r )
{
if (r instanceof DR)
{
return (DR)r;
}
Impl impl = new Impl(r);
if (r instanceof Delayed)
{
impl.expires = ((Delayed) r).getDelay( TimeUnit.MILLISECONDS );
}
return impl;
}
public static DR make( Runnable r, long expires )
{
if (r instanceof DR)
{
if (expires == ((DR)r).getDelay( TimeUnit.MILLISECONDS ))
{
return (DR)r;
}
}
return new Impl(r, expires);
}
}
Implementations will should override: public int compareTo(Delayed o)
, public boolean equals( Object o )
, and public int hashCode()
.
Create a class that extends DelayQueue. This class adds a single method that presents the DelayQuue as a BlockingQueue. The class returned simply wraps the DelayQueue and uses the make
methods of the DR
interface to convert from Runnable
to DR
where necessary.
public class DelayedBlockingQueue extends DelayQueue<DR> {
public BlockingQueue<Runnable> asRunnableQueue() {
return new BlockingQueue<Runnable>(){
DelayedBlockingQueue dbq = DelayedBlockingQueue.this;
public boolean add(Runnable e) {
return dbq.add( DR.make( e ));
}
private List<DR> makeList( Collection<? extends Runnable> coll)
{
return coll.stream().map( r -> DR.make( r ) ).collect( Collectors.toList() ) ;
}
public boolean addAll(Collection<? extends Runnable> arg0) {
return dbq.addAll(makeList( arg0 ) );
}
public void clear() {
dbq.clear();
}
public boolean contains(Object o) {
if (o instanceof Runnable) {
return dbq.contains( DR.make( (Runnable)o ) );
}
return false;
}
public boolean containsAll(Collection<?> arg0) {
List<DR> lst = new ArrayList<DR>();
for (Object o : arg0)
{
if (o instanceof Runnable)
{
lst.add( DR.make( (Runnable)o ) );
}
else {
return false;
}
}
return dbq.containsAll( lst );
}
public int drainTo(Collection<? super Runnable> c, int maxElements) {
return dbq.drainTo( c, maxElements );
}
public int drainTo(Collection<? super Runnable> c) {
return dbq.drainTo( c );
}
public Runnable element() {
return dbq.element();
}
public void forEach(Consumer<? super Runnable> arg0) {
dbq.forEach( arg0 );
}
public boolean isEmpty() {
return dbq.isEmpty();
}
public Iterator<Runnable> iterator() {
return WrappedIterator.create( dbq.iterator() ).mapWith( dr -> (Runnable)dr );
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException {
return dbq.offer( DR.make( e ), timeout, unit );
}
public boolean offer(Runnable e) {
return dbq.offer( DR.make( e ) );
}
public Runnable peek() {
return dbq.peek();
}
public Runnable poll() {
return dbq.poll();
}
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
return dbq.poll( timeout, unit );
}
public void put(Runnable e) throws InterruptedException {
dbq.put( DR.make(e) );
}
public int remainingCapacity() {
return dbq.remainingCapacity();
}
public Runnable remove() {
return dbq.remove();
}
public boolean remove(Object o) {
if (o instanceof Runnable)
{
return dbq.remove( DR.make( (Runnable)o) );
}
return false;
}
public boolean removeAll(Collection<?> arg0) {
List<DR> lst = new ArrayList<DR>();
for (Object o : arg0)
{
if (o instanceof Runnable)
{
lst.add( DR.make( (Runnable)o ) );
}
}
return dbq.removeAll( lst );
}
public boolean retainAll(Collection<?> arg0) {
return dbq.retainAll( arg0 );
}
public int size() {
return dbq.size();
}
public Runnable take() throws InterruptedException {
return dbq.take();
}
public Object[] toArray() {
return dbq.toArray();
}
public <T> T[] toArray(T[] arg0) {
return dbq.toArray( arg0 );
}
};
}
To use the solution create the DelayedBlockingQueue
and use the asRunnableQueue()
method to pass the runnable queue to the ThreadPoolExecutor constructor.
DelayedBlockingQueue queue = new DelayedBlockingQueue();
ThreadPoolExecutor execService = new ThreadPoolExecutor( 1, 5, 30, TimeUnit.SECONDS, queue.asRunnableQueue() );
精彩评论