开发者

Return values from Java Threads

I have a Java Thread like the following:

   public class M开发者_Go百科yThread extends Thread {
        MyService service;
        String id;
        public MyThread(String id) {
            this.id = node;
        }
        public void run() {
            User user = service.getUser(id)
        }
    }

I have about 300 ids, and every couple of seconds - I fire up threads to make a call for each of the id. Eg.

for(String id: ids) {
    MyThread thread = new MyThread(id);
    thread.start();
}

Now, I would like to collect the results from each threads, and do a batch insert to the database, instead of making 300 database inserts every 2 seconds.

Any idea how I can accomplish this?


The canonical approach is to use a Callable and an ExecutorService. submitting a Callable to an ExecutorService returns a (typesafe) Future from which you can get the result.

class TaskAsCallable implements Callable<Result> {
    @Override
    public Result call() {
        return a new Result() // this is where the work is done.
    }
}

ExecutorService executor = Executors.newFixedThreadPool(300);
Future<Result> task = executor.submit(new TaskAsCallable());
Result result = task.get(); // this blocks until result is ready

In your case, you probably want to use invokeAll which returns a List of Futures, or create that list yourself as you add tasks to the executor. To collect results, simply call get on each one.


If you want to collect all of the results before doing the database update, you can use the invokeAll method. This takes care of the bookkeeping that would be required if you submit tasks one at a time, like daveb suggests.

private static final ExecutorService workers = Executors.newCachedThreadPool();

...

Collection<Callable<User>> tasks = new ArrayList<Callable<User>>();
for (final String id : ids) {
  tasks.add(new Callable<User>()
  {

    public User call()
      throws Exception
    {
      return svc.getUser(id);
    }

  });
}
/* invokeAll blocks until all service requests complete, 
 * or a max of 10 seconds. */
List<Future<User>> results = workers.invokeAll(tasks, 10, TimeUnit.SECONDS);
for (Future<User> f : results) {
  User user = f.get();
  /* Add user to batch update. */
  ...
}
/* Commit batch. */
...


Store your result in your object. When it completes, have it drop itself into a synchronized collection (a synchronized queue comes to mind).

When you wish to collect your results to submit, grab everything from the queue and read your results from the objects. You might even have each object know how to "post" it's own results to the database, this way different classes can be submitted and all handled with the exact same tiny, elegant loop.

There are lots of tools in the JDK to help with this, but it is really easy once you start thinking of your thread as a true object and not just a bunch of crap around a "run" method. Once you start thinking of objects this way programming becomes much simpler and more satisfying.


In Java8 there is better way for doing this using CompletableFuture. Say we have class that get's id from the database, for simplicity we can just return a number as below,

static class GenerateNumber implements Supplier<Integer>{

    private final int number;

    GenerateNumber(int number){
        this.number = number;
    }
    @Override
    public Integer get() {
        try {
            TimeUnit.SECONDS.sleep(1);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return this.number;
    }
}

Now we can add the result to a concurrent collection once the results of every future is ready.

Collection<Integer> results = new ConcurrentLinkedQueue<>();
int tasks = 10;
CompletableFuture<?>[] allFutures = new CompletableFuture[tasks];
for (int i = 0; i < tasks; i++) {
     int temp = i;
     CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()-> new GenerateNumber(temp).get(), executor);
     allFutures[i] = future.thenAccept(results::add);
 }

Now we can add a callback when all the futures are ready,

CompletableFuture.allOf(allFutures).thenAccept(c->{
   System.out.println(results); // do something with result
});


You need to store the result in a something like singleton. This has to be properly synchronized.

This not the best advice as it is not good idea to handle raw Threads.


You could create a queue or list which you pass to the threads you create, the threads add their result to the list which gets emptied by a consumer which performs the batch insert.


The simplest approach is to pass an object to each thread (one object per thread) that will contain the result later. The main thread should keep a reference to each result object. When all threads are joined, you can use the results.


public class TopClass {
     List<User> users = new ArrayList<User>();
     void addUser(User user) {
         synchronized(users) {
             users.add(user);
         }
     }
     void store() throws SQLException {
        //storing code goes here
     }
     class MyThread extends Thread {
            MyService service;
            String id;
            public MyThread(String id) {
                this.id = node;
            }
            public void run() {
                User user = service.getUser(id)
                addUser(user);
            }
        }
}


You could make a class which extends Observable. Then your thread can call a method in the Observable class which would notify any classes that registered in that observer by calling Observable.notifyObservers(Object).

The observing class would implement Observer, and register itself with the Observable. You would then implement an update(Observable, Object) method that gets called when Observerable.notifyObservers(Object) is called.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜