To implement graceful shutdown check locking on async calls, or handle exceptions?
I'm developing a Java app that for much of the time, including the point of shutdown, is having to deal with a flood of incoming asynchronous calls from a foreign framework. During normal operation these incoming calls then need to be dispatched to another framework, again asynchronously.
At the moment I'm having my module be a "good" citizen and do some locking around a shutdown flag which, once set, will gracefully cease the dispatch of any further outgoing calls.
The troubling thing is that because both incoming and outgoing calls are asynchronous, I'm having to make each "worker" task perform two sets of locking (see below) to do the same shutdown flag check (EDIT: It's been pointed out to me in another question that using Semaphores only one acquire/release is needed for each worker). It works, but there are many of these worker tasks to handle and I worry about the cumulative slowdown in performance. Profiling will come soon once the framework is expanded a little but regardless of the result it'd be good to follow best practices.
An alternative is to simply do no shutdown flag check locking and handle the anticipated exceptions that are generated when the external frameworks are shutdown before the async calls have finished processing. I should add that there are no detrimental operational effects if this approach is taken. Both methods will result in a clean shutdown.
Your ideas on which is the better practice, please? Heavy-handed locking with no exceptions, versus no locking but a barrage of exceptions.
With locks, the worker task code looks something like this:
final private ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private boolean isShutdown;
private void workerTask()
{
try
{
shutdownLock.readLock().lock();
if (isShutdown)
return;
executeAsynchronously(new Runnable()
{
@Override
final public void run()
{
try
{
shutdownLock.readLock().lock();
if (isShutdown)
return;
// Do something here.
}
finally
{
shutdownLock.readLock().unlock();
}
}
});
}
finally
{
shutdownLock.readLock().unlock();
}
}
The shutdown method requests the shutdownLock.writeLock(), then sets the isShutdown flag.
An alternative without locking and anticipating the shutdown-generated exceptions looks something like this:
volatile private boolean isShutdown;
private void workerTask()
{
try
{
executeAsynchronously(new Runnable()
{
@Override
final public void run()
{
try
{
// Do something here.
}
catch (final FrameworkRuntimeException exception)
{
if ((! isShutdown) || (exception.type !=
FrameworkRuntimeException.FrameworkIsDisposed))
throw exception;
}
}
});
}
catch (final FrameworkRuntimeException exception)
{
if ((! isShutdown) || (exception.type !=
FrameworkRuntimeException.FrameworkIsDisposed))
开发者_Go百科 throw exception;
}
}
The shutdown method for this implementation sets the volatile isShutdown flag to true.
Thanks in advance for any feedback,
Russ
EDIT: It's been helpfully pointed out to me in another question that I could use a Semaphore to avoid the double locking in the first approach, so it wouldn't be so heavy-handed after all, but the question still stands.
In general I would favour the approach where you check for shutdown, then execute the task. If you optimistically and then throw away exceptions that you 'know' are due to shutdown then you run the risk of misclassifying an error and missing out on a real problem.
As far as simplifying the code goes, you can get rid of all the locks and just make sure that your executeAsynchronously
method uses an ExecutorService
- then your shutdown
method just calls shutdown
on the service, the task creation can be skipped if isShutdown
returns true and if you need to wait for tasks to finish before returning you can use the helpful awaitTermination
method.
Ok so I think this here should work and not impede too much of a runtime overhead (note it's 4.30 in the morning here, so better double check ;) ). Also note that throwing in some good try{} finally{} code blocks would be a pretty good idea, but omitted for clarity.
public static final AtomicInteger activeConnections = new AtomicInteger();
public static volatile boolean shutdown = false;
public static void shutdown() {
shutdown = true;
while (activeConnections.get() > 0) {
synchronized(activeConnections) {
try {
activeConnections.wait();
}
catch(InterruptedException e) {
}
}
}
// proceed shutdown
}
public static void run() {
if (shutdown) return;
activeConnections.incrementAndGet();
if (shutdown) {
leave();
return;
}
// do stuff
leave();
}
private static void leave() {
int outstandingConnections = activeConnections.decrementAndGet();
if (shutdown && outstandingConnections == 0) {
synchronized(activeConnections) {
activeConnections.notifyAll();
}
}
}
As soon as the shutdown flag is set, no new thread starts working. Every thread increments an integer while communicating with the external framework and decrements it when it's finished. The shutdown may only proceed as soon as no thread is communicating anymore - note that since the shutdown flag is set at first no new thread will start anymore.
That way you get the pretty lightweight AtomicInteger (that's implemented as a CAS loop, you can't get much lower overhead) and the volatile memory barrier.
Now I'm still standing to my first comment and say that it's simpler, more efficient and shorter to catch the exceptions, but I liked the problem :)
if you use spring below code works for gracefull shutdown. You may change the retry numbers.
package com.avea.vpspg.test.schedulers;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;
import com.avea.vpspg.core.VasProvLogger;
@Component
class ContextClosedHandler implements ApplicationListener<ContextClosedEvent> , ApplicationContextAware,BeanPostProcessor{
private ApplicationContext context;
public Logger logger = XProvLogger.getInstance().x;
public void onApplicationEvent(ContextClosedEvent event) {
Map<String, ThreadPoolTaskScheduler> schedulers = context.getBeansOfType(ThreadPoolTaskScheduler.class);
for (ThreadPoolTaskScheduler scheduler : schedulers.values()) {
scheduler.getScheduledExecutor().shutdown();
try {
scheduler.getScheduledExecutor().awaitTermination(20000, TimeUnit.MILLISECONDS);
if(scheduler.getScheduledExecutor().isTerminated() || scheduler.getScheduledExecutor().isShutdown())
logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has stoped");
else{
logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has not stoped normally and will be shut down immediately");
scheduler.getScheduledExecutor().shutdownNow();
logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has shut down immediately");
}
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Map<String, ThreadPoolTaskExecutor> executers = context.getBeansOfType(ThreadPoolTaskExecutor.class);
for (ThreadPoolTaskExecutor executor: executers.values()) {
int retryCount = 0;
while(executor.getActiveCount()>0 && ++retryCount<51){
try {
logger.info("Executer "+executor.getThreadNamePrefix()+" is still working with active " + executor.getActiveCount()+" work. Retry count is "+retryCount);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(!(retryCount<51))
logger.info("Executer "+executor.getThreadNamePrefix()+" is still working.Since Retry count exceeded max value "+retryCount+", will be killed immediately");
executor.shutdown();
logger.info("Executer "+executor.getThreadNamePrefix()+" with active " + executor.getActiveCount()+" work has killed");
}
}
@Override
public void setApplicationContext(ApplicationContext context)
throws BeansException {
this.context = context;
}
@Override
public Object postProcessAfterInitialization(Object object, String arg1)
throws BeansException {
return object;
}
@Override
public Object postProcessBeforeInitialization(Object object, String arg1)
throws BeansException {
if(object instanceof ThreadPoolTaskScheduler)
((ThreadPoolTaskScheduler)object).setWaitForTasksToCompleteOnShutdown(true);
if(object instanceof ThreadPoolTaskExecutor)
((ThreadPoolTaskExecutor)object).setWaitForTasksToCompleteOnShutdown(true);
return object;
}
精彩评论