开发者

java8 Future异步调用实现方式

目录
  • 一、同步与异步调用概念
  • 二、Future实现异步调用
    • 2.1 future实现异步调用的方式
    • 2.2 使用CompletableFuture来实现异步调用
  • 三、流顺序执行、并行、并发–异步执行、并发–自定义异步执行比较
    • 总结

      一、同步与异步调用概念

      • 同步API:调用方在调用某个方法后,等待被调用方返回结果;调用方在取得被调用方的返回值后,再继续运行。调用方顺序执行,同步等待被调用方的返回值,这就是阻塞式调用。
      • 异步API:调用方在调用某个方法后,直接返回,不需要等待被调用方返回结果;被调用方开启一个线程处理任务,调用方可以同时去处理其他工作。调用方和被调用方是异步的,这就是非阻塞式调用。

      在Java种,Future用来完成异步工作任务,极大地提高了程序的运行效率。

      二、Future实现异步调用

      2.1 future实现异步调用的方式

      在Java8之前,直接使用Future以异步的方式执行一个耗时的操作。通过这种编程方式,调用方线程使用ExecutorService,以并发方式调用另一个线程,在执行耗时操作的同时,去执行一些其他的任务。

      代码示例:

      package com.mvp.test;
      
      import org.junit.Test;
      
      import java.util.Arrays;
      import java.util.concurrent.Callable;
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.ExecutorServiceandroid;
      import java.util.concurrent.Executors;
      import java.util.concurrent.Future;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.TimeoutException;
      
      public class FutureUseTest {
          //创建Executor-Service,实现向线程池提交任务
          ExecutorService executor = Executors.newCachedThreadPool();
      
          @Test
          public void futureTest() {
              long start = System.nanoTime();
              //向 Executor-Service 提交一个Callable 对象
              Future<Double> future = executor.submit(new Callable<Double>() {
                  public Double call() {
                      //以异步方式在新的线程中执行耗时的操作
                      return DOSomeLongComputation(start);
                  }
              });
              //异步操作进行的同时,你可以做其他的事情
              doSomethingElse(start);
              Double result = 0.0;
              try {
                  //获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟之后退出
                  result = future.get(2, TimeUnit.SECONDS);
      //            result = future.get();
              } catch (ExecutionException ee) {
                  System.out.println("ExecutionException="  + Arrays.toString(ee.getStackTrace()));
                  // 计算抛出一个异常
              } catch (InterruptedException ie) {
                  System.out.println("InterruptedException="  + Arrays.toString(ie.getStackTrace()));
                  // 当前线程在等待过程中被中断
              } catch (TimeoutException te) {
                  System.out.println("TimeoutException="  + Arrays.toString(te.getStackTrace()));
                  // 在Future对象完成之前超过已过期
              }
              System.out.println("全部计算完成,耗时:"+ (System.nanoTime() - start) / 1_000_000 + " msecs");
              System.out.println("result=" + result);
          }
      
          public double doSomeLongComputation(Long start) {
              delayLong();
              System.out.println("异步执行一个长的计算,耗时:" + (System.nanoTime() - start) / 1_000_000 + " msecs");
              return 65000.00;
          }
          public void doSomethingElse(Long start) {
              delay();
              System.out.println("当前线程做别的计算,耗时:"+ (System.nanoTime() - start) / 1_000_000 + " msecs");
          }
      
          private void delay() {
              try {
                  Thread.sleep( (long) (Math.random() * 1000));
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
          }
      
          private void delayLong() {
              try {
                  Thread.sleep(1500);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
          }
      }

      运行结果为:

      当前线程做别的计算,耗时:403 msecs

      异步执行一个长的计算,耗时:1506 msecs

      全部计算完成,耗时:1506 msecs

      result=65000.0

      2.2 使用CompletableFuture来实现异步调用

      在java中,引入了CompletableFuture,更为方便地实现异步调用。

      代码示例为:

      package com.mvp.test;
      
      import org.junit.Test;
      
      import java.util.Random;
      import java.util.concurrent.CompletableFuture;
      import java.util.concurrent.Future;
      
      public class CompletableFutureUsetTest {
      
          private double calculateAsyncPrice(String product) {
              delayLong();
              double price = new Random().nextDouble()* 1000 + 150;
              // System.out.println("calculate Price Of " + product + "is: " + price);
              return price;
          }
      
          private double calculatePrice(String product) {
              delay();
              double price = new Random().nextDouble()* 1000 + 150;
              // System.out.println("calculate Price Of " + product + "is: " + price);
              return price;
          }
      
          public Future<Double> getPriceAsync(String product) {
              // 创建 CompletableFuture对象,它会包含计算的结果
              CompletableFuture<Double> futurePrice = new CompletableFuture<>();
              // 在另一个线程中以异步方式执行计算
              new Thread( () -> {
        OiEMmHCZ          System.out.println("异步线程处理中");
                  try {
                      // 如果价格计算正常结束,完成Future操作,并设置商品价格
                      double price = calculateAsyncPrice(product);
                      // 设置Future的返回值,用以获得需长时间计算的任务的结果
                      futurePrice.complete(price);
                  } catch (Exception ex) {
                      // 若存在导致失败的异常,则强制这次Future操作异常结束,并抛出Future完成异常
                      futurePrice.completeExceptionally(ex);
                  }
              }).start();
              // 无需等待,直接返回 Future 对象
              return futurePrice;
          }
      
          public double getPriceDirect(Long start, String product) {
              double price = calculatePrice(product);
              System.out.println("当前线程去查询" + product + "的价格,耗时:"+ (System.nanoTime() - start) / 1_000_000 + " msecs");
              return price;
          }
      
          @Test
          public void completableFutureTest() {
              // 执行异步任务
              long startNanoTime = System.nanoTime();
              Future<Double> futurePrice = getPriceAsync("篮球");
              long returnFutureNanoTime = System.nanoTime();
              long invocationTime = ((returnFutureNanoTime - startNanoTime) / 1_000_000);
              System.out.println("调用getPriceAsyc方法直接返回,耗时: " + invocationTime + " msecs");
      
              // 执行同步任务
              double priceDirect = getPriceDirect(returnFutureNanoTime, "足球");
      
              double priceAsync = 0.0;
              try {
                  priceAsync = futurePrice.get();
      //            priceAsync = futurePrice.get(1, TimeUnit.SECONDS);
              } catch (Exception e) {
                  //throw new RuntimeException(e);
                  System.out.println("exception=" + e.toString());
              }
              System.out.printf("篮球和足球的总价格是: %.2f, futurePrice.get()耗时=%s msecs %n", priceAsync + priceDirect, (System.nanoTime() - returnFutureNanoTime) / 1_000_000);
      
              long retrievalTime = ((System.nanoTime() - startNanoTime) / 1_000_000);
              System.out.println("总耗时:" + retrievalTime + " msecs");
          }
      
          private void delay() {
              try {
                  //Thread.sleep( (long) (Math.random() * 1000));
                  Thread.sleep( 200);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
          }
      
          private void delayLong() {
              try {
                  Thread.sleep( 1500);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
          }
      }

      运行结编程果为:

      调用getPriceAsyc方法直接返回,耗时: 199 msecs

      异步线程处理中

      当前线程去查询足球的价格,耗时:201 msecs

      篮球和足球的总价格是: 914.33, futurePrice.get()耗时=1500 msecs

      总耗时:1704 msecs

      CompletableFuture类提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,不需担心实现的细节。

      例如,在采用supplyAsync方法后,可以用一行语句重写上例中的getPriceAsync方法,如下所示:

      package com.mvp.test;
      
      import org.junit.Test;
      
      import java.util.Random;
      import java.util.concurrent.CompletableFuture;
      import java.util.concurrent.Future;
      
      public class CompletableFutureSupplyAsyncTest {
      
          private double calculateAsyncPrice(String product) {
              delayLong();
              double price = new Random().nextDouble()* 1000 + 150;
              //System.out.println("calculate Price Of " + product + "is: " + price);
              return price;
          }
      
          private double calculatePrice(String product) {
              delay();
              double price = new Random().nextDouble()* 1000 + 150;
              //System.out.println("calculate Price Of " + product + "is: " + price);
              return price;
          }
      
          //使用工厂方法 supplyAsync 创建 CompletableFuture 对象
          public Future<Double> getPriceAsync(String product) {
              return CompletableFuture.supplyAsync(() -> calculateAsyncPrice(product));
          }
      
          public double getPriceDirect(Long start, String product) {
              double price = calculatePrice(product);
              System.out.println("当前线程去查询" + product + "的价格, 耗时:"+ (System.nanoTime() - start) / 1_000_000 + " msecs");
              return price;
          }
      
          @Test
          public void futureSupplyAsjsyncTest() {
              // 执行异步任务
              long startNanoTime = System.nanoTime();
              Future<Double> futurePrice = getPriceAsync("篮球");
              long returnFutureNanoTime = System.nanoTime();
              long invocationTime = ((returnFutureNanoTime - startNanoTime) / 1_000_000);
              System.out.println("调用getPriceAsyc方法直接返回,耗时: " + invocationTime + " msecs");
      
              // 执行同步任务
              long startSyncNanoTime = System.nanoTime();
              double priceDirect = getPriceDirect(startSyncNanoTime, "足球");
      
              double priceAsync = 0.0;
              try {
                  priceAsync = futurePrice.get();
                  // priceAsync = futurePrice.get(1, TimeUnit.SECONDS);
              } catch (Exception e) {
                  //throw new RuntimeException(e);
                  System.out.println("exception=" + e.toString());
              }
              System.out.printf("篮球和足球的总价格是: %.2f, futurePrice.get() 耗时:%s msecs %n", priceAsync + priceDirect, (System.nanoTime() - returnFutureNanoTime) / 1_000_000);
      
              long retrievalTime = ((System.nanoTime() - startNanoTime) / 1_000_000);
              System.out.println("总耗时:" + retrievalTime + " msecs");
          }
      
          private void delay() {
              try {
                  //Thread.sleep( (long) (Math.random() * 1000));
                  Thread.sleep( 200);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
          }
      
          private void delayLong() {
              try {
                  Thread.sleep( 1500);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
          }
      }

      运行结果为:

      调用getPriceAsyc方法直接返回,耗时: 176 msecs

      当前线程去查询足球的价格, 耗时:204 msecs

      篮球和足球的总价格是: 997.99, futurePrice.get() 耗时:1502 msecs 

      总耗时:1681 msecs

      supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象(在完成异步执行后,该对象会读取异步方法的返回值)。

      异步方法会交由ForkJoinPool池中的某个执行器(Executor)运行,也可以使用supplyAsync方法的重载版本,传递第2个参数指定不同的执行器(Executor)执行异步方法。

      一般而言,向CompletableFuture的工厂方法传递可选参数,指定异步方法的执行器。

      三、流顺序执行、并行、并发–异步执行、并发–自定义异步执行比较

      对流顺序执行、并行、并发–异步执行、并发–自定义异步执行进行比较,代码如下:

      package com.mvp.test;
      
      import org.junit.Test;
      
      import java.util.Arrays;
      import java.util.List;
      import java.util.Random;
      import java.util.concurrent.CompletableFuture;
      import java.util.concurrent.Executor;
      import java.util.concurrent.Executors;
      import java.util.concurrent.ThreadFactory;
      import java.util.stream.Collectors;
      
      public class CompareParallelFutureUseTest {
          List<String> shopNames = Arrays.asList("北京华联", "华润", "沃尔玛", "大润发", "万果园", "一峰");
      
          private double calculatePrice(String product) {
              double price = new Random().nextDouble()* 1000 + 150;
              //System.out.println("calculate Price Of " + product + "is: " + price);
              return price;
          }
      
          public double getPrice(String product) {
              return calculatePrice(product);
          }
      
          /**
           * 使用流顺序计算
           * @param product 商品名称
           * @return 列表
           */
          public List<String> findPrices(String product) {
              return shopNames.stream()
                      .map(shopName -> String.format("%s 价格: %.2f", shopName, getPrice(product)))
                      .collect(Collectors.toList());
          }
      
          /**
           * 使用流并行计算
           * @param product 商品名称
           * @return 列表
           */
          public List<String> findPricesParallel(String product) {
              return shopNames.parallelStream()
                      .map(shopName -> String.format("%s 价格: %.2f", shopName, getPrice(product)))
                      .collect(Collectors.toList());
          }
      
          /**
           * 异步运算
           * @param product 商品名称
           * @return 列表
           */
          public List<String> fi编程客栈ndPricesFuture(String product) {
              List<CompletableFuture<String>> priceFutures = shopNames.stream()
                      .map(shopName -> CompletableFuture.supplyAsync(() -> String.format("%s 价格: %.2f", shopName, getPrice(product))))
                      .collect(Collectors.toList());
              //CompletableFuture类中的join方法 和 Future接口中的get方法 有相同的含义
              return priceFutures.stream()
                      .map(CompletableFuture::join)
                      .collect(Collectors.toList());
          }
      
          //创建一个线程池,其线程数目为100和商店数目二者中较小的一个值
          private final Executor executor1 = Executors.newFixedThreadPool(Math.min(shopNames.size(), 10),
                  new ThreadFactory() {
                      public Thread newThread(Runnable r) {
                          Thread t = new Thread(r);
                          // 使用守护线程。这种方式不会阻止程序的关停。
                          t.setDaemon(true);
                          return t;
                      }
              });
      
          /**
           * 异步运算:使用定制的执行器(调整线程池的大小)
           * @param product 商品名称
           * @return 列表
           */
          public List<String> findPricesFutureCustom(String product) {
              List<CompletableFuture<String>> priceFutures = shopNames.stream()
                      .map(shopName -> CompletableFuture.supplyAsync(() -> String.format("%s 价格: %.2f", shopName, getPrice(product)), executor1))
                      .collect(Collectors.toList());
      
              return priceFutures.stream()
                      .map(CompletableFuture::join)
                      .collect(Collectors.toList());
          }
      
          @Test
          public void futureCompareTest() {
              long start = System.nanoTime();
              System.out.println(findPrices("羽毛球"));
              System.out.println("使用流顺序计算 Done in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
      
              start = System.nanoTime();
              System.out.println(findPricesParallel("羽毛球"));
              System.out.println("使用流并行计算 Done in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
      
              start = System.nanoTime();
              System.out.println(findPricesFuture("羽毛球"));
              System.out.println("并发Future异步运算(默认执行器) Done in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
      
              //并行和并发不相伯仲,究其原因都一样:它们内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于
              // Runtime.getRuntime().availableProcessors() 的返回值。
              // 然而,CompletableFuture具有一定的优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小。
              start = System.nanoTime();
              System.out.println(findPricesFutureCustom("羽毛球"));
              System.out.println("并发Future异步运算(定制执行器:调整线程池的大小) Done in " + (System.nanoTime() - start) / 1_000_000 + " msecs");
          }
      }

      运行结果为:

      [北京华联 价格: 552.91, 华润 价格: 173.53, 沃尔玛 价格: 981.30, 大润发 价格: 339.54, 万果园 价格: 872.71, 一峰 价格: 338.87]

      使用流顺序计算 Done in 148 msecs

      [北京华联 价格: 475.23, 华润 价格: 991.62, 沃尔玛 价格: 469.81, 大润发 价格: 1140.04, 万果园 价格: 199.57, 一峰 价格: 210.05]

      使用流并行计算 Done in 5 msecs

      [北京华联 价格: 723.78, 华润 价格: 546.76, 沃尔玛 价格: 979.16, 大润发 价格: 402.02, 万果园 价格: 770.86, 一峰 价格: 601.99]

      并发Future异步运算(默认执行器) Done in 5 msecs

      [北京华联 价格: 854.24, 华润 价格: 1000.75, 沃尔玛 价格: 1103.58, 大润发 价格: 355.49, 万果园 价格: 849.84, 一峰 价格: 1051.99]

      并发Future异步运算(定制执行器:调整线程池的大小) Done in 4 msecs

      从运行结果可以看出,流并行计算、异步运算、自定义执行器异步运算的效率比流顺序计算要高很多。

      总结

      以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新开发

      开发排行榜