开发者

Using Spring threading and TaskExecutor, how do I know when a thread is finished?

Alright, possible a naive question here. I have a service that needs to log into multiple network devices, run a command on each and collect the results. For speed, rather than collect the information on each device in sequence, I need to access them all concurrently and consume the results after they are done.

Using the Spring framework and Jsch I'm quite easily able to query each device correctly. Where I am running into some confusion is in trying to rewire the beans to use TaskExecutor to accomplish this. What I can't figure out how to do is how to know when the thread is finished.

What I have so far is this:

public class RemoteCommand {

    private String user;
    private String host;
    private String password;
    private String command;
    private List<String> commandResults;
    private TaskExecutor taskExecutor;

    public RemoteCommand(String user, String host, String password, TaskExecutor taskExecutor) {

        setUser(user);
        setHost(host);
        setPassword(password);
        setTaskExecutor(taskExecutor);
    }

    /**
     * @param user the user to set
     */
    public void setUser(String user) {
        this.user = user;
    }

    /**
     * @return the user
     */
    public String getUser() {
        return user;
    }

    /**
     * @param host the host to set
     */
    public void setHost(String host) {
        this.host = host;
    }

    /**
     * @return the host
     */
    public String getHost() {
        return host;
    }

    /**
     * @param password the password to set
     */
    public void setPassword(String password) {
        this.password = password;
    }

    /**
     * @return the password
     */
    public String getPassword() {
        return password;
    }

    /**
     * @param command the command to set
     */
    private void setCommand(String command) {
        this.command = command;
    }

    /**
     * @return the command
     */
    private String getCommand() {
        return command;
    }

    /**
     * @param commandResults the commandResults to set
     */
    private void setCommandResults(List<String> commandResults) {
        this.commandResults = commandResults;
    }

    /**
     * @return the commandResults
     */
    public List<String> getCommandResults(String command) {
        taskExecutor.execute(new CommandTask(command) );

        return commandResults;
    }

    /**
     * @param taskExecutor the taskExecutor to set
     */
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    /**
     * @return the taskExecutor
     */
    public TaskExecutor getTaskExecutor() {
        return taskExecutor;
    }

    private class CommandTask implements Runnable {

        public CommandTask(String command) {
            setCommand(command);
            System.out.println("test: " + getCommand());
        }

     开发者_C百科   /**
         * 
         * @param command
         */
        public void run() {

            List<String> results = new LinkedList<String>();
            String command = getCommand();

            try {
                System.out.println("running");
                JSch jsch = new JSch();

                String user = getUser();
                String host = getHost();

                java.util.Properties config = new java.util.Properties(); 
                config.put("StrictHostKeyChecking", "no");

                host = host.substring(host.indexOf('@') + 1);
                Session session = jsch.getSession(user, host, 22);

                session.setPassword(getPassword());
                session.setConfig(config);
                session.connect();

                Channel channel = session.openChannel("exec");
                ((ChannelExec) channel).setCommand(command);

                channel.setInputStream(null);

                ((ChannelExec) channel).setErrStream(System.err);

                InputStream in = channel.getInputStream();

                channel.connect();
                byte[] tmp = new byte[1024];
                while (true) {
                    while (in.available() > 0) {
                        int i = in.read(tmp, 0, 1024);
                        if (i < 0)
                            break;
                        results.add(new String(tmp, 0, i));
                        System.out.print(new String(tmp, 0, i));
                    }
                    if (channel.isClosed()) {
                        //System.out.println("exit-status: "
                        //      + channel.getExitStatus());
                        break;
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (Exception ee) {
                        ee.printStackTrace();
                    }
                }
                channel.disconnect();
                session.disconnect();
            } catch (Exception e) {
                System.out.println(e);
            }
            setCommandResults(results);
            System.out.println("finished running");
        }
    }
}

Within my junit test I have:

@Test
    public void testRemoteExecution() {

        remoteCommand = (RemoteCommand) applicationContext.getBean("remoteCommand");
        remoteCommand.getCommandResults("scripts/something.pl xxx.xxx.xxx.xxx");

            //List<String> results = remoteCommand.getCommandResults("scripts/something.pl xxx.xxx.xxx.xxx");
        //for (String line : results) {
        //  System.out.println(line.trim());
        //}
    }

My applicationContext.xml file:

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
       <property name="corePoolSize" value="5" />
       <property name="maxPoolSize" value="10" />
       <property name="queueCapacity" value="25" />
    </bean>        

<!-- ******************** -->
<!--      Utilities       -->
<!-- ******************** -->

     <bean name="remoteCommand" class="com.xxx.ncc.sonet.utilities.RemoteCommand" scope="prototype">
        <description>Remote Command</description>
        <constructor-arg><value>${remote.user}</value></constructor-arg>
        <constructor-arg><value>${remote.host}</value></constructor-arg>
        <constructor-arg><value>${remote.password}</value></constructor-arg>
        <constructor-arg ref="taskExecutor" />
    </bean> 

I get as far as the first println in the run() method. Then the test exits cleanly with no errors. I never get to the second println at the bottom of that routine. I've looked at this thread here, which was very useful, but not implemented in a Spring specific fashion. I'm sure I'm missing something simple, or have completely run off the rails here. Any help is appreciated.


The TaskExecutor interface is a fire-and-forget interface, for use when you don't care when the task finishes. It's the simplest async abstraction that Spring offers.

There is , however, an enhanced interface, AsyncTaskExecutor, which provides additional methods, including submit() methods that return a Future, which let you wait on the result.

Spring provides the ThreadPoolTaskExecutor class, which implement both TaskExecutor and AsyncTaskExecutor.

In your specific case, I would re-implement the Runnable as a Callable, and return the commandResults from the Callable.call() method. The getCommandResults method can then be reimplemented as:

public List<String> getCommandResults(String command) {
   Future<List<String>> futureResults = taskExecutor.submit(new CommandTask(command));
   return futureResults.get();
}

This method will submit the task asynchronously, and then wait for it to complete before returning the results returned from the Callable.call() method. This also lets you get rid of the commandResults field.


public List<String> getCommandResults(String command) {
    FutureTask task = new FutureTask(new CommandTask(command))
    taskExecutor.execute(task);

    return task.get(); //or task.get(); return commandResults; - but it not a good practice
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜