开发者

Java: Design Ideas for Stopping Callable Thread

I am writing a program that does some batch processing. The batch elements can be processed independently of each other and we want to minimize overall processing time. So, instead of looping through each element in the batch one at a time, I am using an ExecutorService and submitting Callable objects to it:

    public void process(Batch batch)
    {
        ExecutorService execService = Executors.newCachedThreadPool();
        CopyOnWriteArrayList<Future<BatchElementStatus>> futures = new CopyOnWriteArrayList<Future<BatchElementStatus>>();

        for (BatchElement element : batch.getElement())
        {
            Future<MtaMigrationStatus> future = execService.submit(new ElementProcessor(batch.getID(),
                    element));
            futures.add(future);
        }

        boolean done = false;

        while (!done)
        {
            for (Future<BatchElementStatus> future : futures)
            {
                try
                {
                    if (future.isDone())
                    {
                        futures.remove(future);
                    }
                }
                catch (Exception e)
                {
                    System.out.println(e.getMessage());
                }

                if (futures.size() == 0)
                {
                    done = true;
                }
            }
        }
    }

We want to be able to allow the batch processing to be cancelled. Because I'm not using a loop, I can't just check at the top each loop if a cancel flag has been set.

We are using a JMS topic to which both the BatchProcessor and ElementProcessor will be listening to inform them the batch has been cancelled.

There are a number of steps in the ElementProcess call() after which some of them the processing can be safely stopped but there's a point of no return. The class has this basic design:

public class ElementProcessor implements Callable, MessageListener
{
    private cancelled = false;

    public void onMessage(Message msg)
    {
        // get message object
        cancelled = true;
    }

    public BatchElementStatus call()
    {
        String status = SUCCESS;

        if (!cancelled)
        {
            doSomehingOne();
        }
        else
        {
            doRollback();
            status = CANCELLED;
        }            

        if (!cancelled)
        {
            doSomehingTwo();
        }
        else
        {
            doRollback();
            status = CANCELLED;
        }            

        if (!cancelled)
        {
            doSomehingThree();
        }
        else
        {
            doRollback();
            status = CANCELLED;
        }            

        if (!cancelled)
        {
            doSomehingFour();
        }
        else
        {
            doRollback();
            status = CANCELLED;
 开发者_开发技巧       }

        // After this point, we cannot cancel or pause the processing

        doSomehingFive();
        doSomehingSix();

        return new BatchElementStatus("SUCCESS");
    }

}

I'm wondering if there's a better way to check if the batch/element has been cancelled other than wrapping method calls/blocks of code in the call method in the if(!cancelled) statements.

Any suggestions?


I don't think you can do much better than what you are currently doing, but here is an alternative:

public BatchElementStatus call() {
    return callMethod(1);
}

private callMethod(int methodCounter) {
    if (cancelled) {
       doRollback();
       return new BatchElementStatus("FAIL");
    }
    switch (methodCounter) {
       case 1 : doSomethingOne(); break;
       case 2 : doSomethingTwo(); break;
       ...
       case 5 : doSomethingFive();
                doSomethingSix();
                return new BatchElementStatus("SUCCESS");
    }
    return callMethod(methodCounter + 1);
}          

Also, you want to make cancelled volatile, since onMessage will be called from another thread. But you probably don't want to use onMessage and cancelled anyway (see below).

Other minor points: 1) CopyOnWriteArrayList<Future<BatchElementStatus>> futures should just be an ArrayList. Using a concurrent collection mislead us into thinking that futures is on many thread. 2) while (!done) should be replaced by while (!futures.isEmpty()) and done removed. 3) You probably should just call future.cancel(true) instead of "messaging" cancellation. You would then have to check if (Thread.interrupted()) instead of if (cancelled). If you want to kill all futures then just call execService.shutdownNow(); your tasks have to handle interrupts for this to work.

EDIT:

instead of your while(!done) { for (... futures) { ... }}, you should use an ExecutorCompletionService. It does what you are trying to do and it probably does it a lot better. There is a complete example in the API.


Future has a cancel(boolean) method that will interrupt the running thread if true is passed in

so replace the if(!cancelled) checks with if(Thread.interrupted()) and return when you got a interrupt (you're not currently)

note that this will reset the interrupted flag to false (so if(Thread.interrupted()&&Thread.interrupted()) will be false) if you don't want to reset it use Thread.currentThread().isInterrupted() this maintains the flag for subsequent checks

or you can reset the flag to interrupted with Thread.currentThread().interrupt();

besides that use this inside the waiting while

for(Iterator<Future<MtaMigrationStatus>> it = futures.iterator();it.hasNext();){
    Future<MtaMigrationStatus> future = it.next();
    try
    {
        if (future.isDone())
        {
            it.remove();//<<--this avoids concurrent modification exception in the loop
        }
    }
    catch (Exception e)
    {
        System.out.println(e.getMessage());
    }

}


if (futures.size() == 0)//outside the inner for loop and inside the while (or make the condition this) for micro-optimizing this check
{
    done = true;
}


Your ElementProcessor can extend from java.util.concurrent.FutureTask which is

A cancellable asynchronous computation. This class provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation.

The FutureTask class is an implementation of Future that implements Runnable, and so may be executed by an Executor.

FutureTask has a cancel method which you can implement to do some cancel specific operations. Also, if FutureTask is canceled it will not be executed anymore, so you don't have to check always the status.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜