What is best way to have Bounded Queue with ScheduledThreadPoolExecutor?
The Sun Java (1.6) ScheduledThreadPoolExecutor
which is an extension of ThreadPoolExecutor
internally uses an implementation of DelayQueue
which is an unbounded queue. What I need is a ScheduledThreadpoolExecutor
with a bounded queue i.e. it has a limit on the tasks accumulating in the queue so that when tasks in queue exceed the limit, it starts rejecting the further submitted tasks and prevents JVM going out of memory.
Surprisingly, google or stackoverflow did not point me to any results which are discussing this problem. Is there any such thing already开发者_运维技巧 available I am missing out? If not, how can I implement a ScheduledThreadpoolExecutor to provide me my expected functionality in a best way?
As others have already pointed out, there isn't a ready way of doing this. Just make sure you try to use "composition" instead of "inheritance". Create a new class which implements the necessary interface and delegate to the underlying ScheduledThreadPoolExecutor
by doing checks as per required on the necessary methods.
You can also use the technique specified in this thread with a simple modification. Instead of using Semaphore#acquire
, you can use Semaphore#tryAcquire
and depending on the boolean outcome decide whether you need to call the rejection handler or not. Come to think of it, I personally feel that it was an oversight on the part of the library authors to directly subclass a specific executor rather than relying on composition to create a "schedulable" wrapper over a normal executor.
How about handling it differently i.e. depending upon the queue size delay the task subsmission. The executor services exposes the queue via getQueue(). You can invoke the size() on it and depending upon the limit you plan for the queue size, you can either start rejecting the tasks or start delaying the task execution (increase the scheduled time keeping the size of the queue as one of the factor).
All said, this is again not the best solution; just fyi, java provides delay queue to support work stealing.
The simplest workaround is to use scheduled executor to schedule tasks only, not to actually execute them. Scheduler have to explicitly check executor queue size and discard task if executor queue is above a threshold.
Another option is to check ScheduledThreadPoolExecutor queue size right in scheduled task. If the queue is above threshold, just return immediately. In this case the task will be executed instantly and removed from queue. So overflow won't happen.
ScheduledThreadPoolExecutor does not use queue as field but instead calls getQueue. But it calls super.getQueue which is queue from ThreadPoolExecutor. You can use reflection to override it like this:
public class BoundedScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
public BoundedScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler, int queueCapacity) {
super(corePoolSize, handler);
setMaximumPoolSize(corePoolSize);
setKeepAliveTime(0, TimeUnit.MILLISECONDS);
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueCapacity) {
@Override
public boolean add(Runnable r) {
boolean added = offer(r);
if (added) {
return added;
} else {
getRejectedExecutionHandler().rejectedExecution(r, CrashingThreadPoolExecutor.this);
return false;
}
}
};
try {
Field workQueueField = ThreadPoolExecutor.class.getDeclaredField("workQueue");
workQueueField.setAccessible(true);
workQueueField.set(this, queue);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
If you really, really don't want to re-implement ScheduledThreadPoolExecutor
then you could extend it and override all the schedule*
methods and implement your own bounding of tasks. It would be fairly nasty though:
private final Object scheduleMonitor = new Object();
@Override
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit)
{
if (command == null || unit == null)
throw new NullPointerException();
synchronized (scheduleMonitor)
{
while (getQueue().size() >= MAX_QUEUE_SIZE)
{
scheduleMonitor.wait();
}
super.schedule(command, delay, unit);
}
}
@Override
Runnable getTask()
{
final Runnable r = getTask();
synchronized (scheduleMonitor)
{
scheduleMonitor.notify();
}
return r;
}
And repeat for:
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
Note, this won't stop repeating tasks from taking the queue over a limit, it will only block newly scheduled tasks.
Another caveat is that I haven't checked for any deadlock issues by calling super.schedule
while holding the lock on scheduleMonitor
...
精彩评论