开发者

Java中线程执行状态检测的四种可靠方法

目录
  • 线程执行状态检测的挑战
  • 方法一:使用 Thread.join()等待线程完成
  • 方法二:通过共享变量或回调传递执行状态
  • 方法三:使用 Future 和 Callable 获取执行结果和异常
  • 线程池拒绝策略的选择依据
  • 方法四:使用 CompletableFuture 实现异步任务监控
  • CompletableFuture 多级异常处理
  • 组合多个 CompletableFuture
  • Future 与 CompletableFuture 的关键区别
    • Future.get() vs Future.getNow()
    • CompletableFuture 的线程模型
    • 线程池泄漏风险
  • 使用 UncaughtExceptionHandler 捕获线程未处理异常
    • 线程池配置最佳实践
      • 实际场景示例
        • 总结

          在多线程开发中,开发者常需监控子线程状态。当主线程创建并启动多个工作线程后,如何判断这些任务是否成功完成?工作线程因未处理异常终止时,主线程往往无法直接感知。这个问题关系到系统的可靠性和错误处理能力,需要一套完善的方案来解决。

          线程执行状态检测的挑战

          在 Java 多线程编程中,主线程启动工作线程后,默认情况下无法直接获知工作线程的执行结果。这是由于线程的独立性特征决定的:

          Java中线程执行状态检测的四种可靠方法

          那么,如何让主线程知道工作线程是否执行成功呢?下面介绍四种python从基础到高级的解决方案。

          方法一:使用 Thread.join()等待线程完成

          最基础的方法是调用线程的 join()方法,使主线程等待工作线程执行完毕。

          public class JoinExample {
              public static void main(String[] args) {
                  Thread thread = new Thread(() -> {
                      try {
                          System.out.println("工作线程开始执行");
                          // 模拟任务执行
                          Thread.sleep(2000);
                          System.out.println("工作线程执行完成");
                      } catch (InterruptedException e) {
                          System.err.println("工作线程被中断:" + e.getMessage());
                          Thread.currentThread().interrupt();
                      } catch (Exception e) {
                          System.err.println("工作线程执行异常:" + e.getMessage());
                      }
                  });
          
                  try {
                      thread.start();
                      // 主线程等待工作线程执行完成,支持设置超时
                      // thread.join(3000); // 等待最多3秒
                      thread.join();
          
                      // 检查线程状态
                      if (!thread.isAlive()) {
                          System.out.println("工作线程已结束执行");
                          // 但仍无法知道是成功还是失败
                      }
                  } catch (InterruptedException e) {
                      System.err.println("主线程等待过程中被中断");
                      Thread.currentThread().interrupt();
                  }
              }
          }
          

          局限性

          • 只能知道线程是否执行完毕,无法获取执行结果或异常信息
          • 支持基础的超时设置(通过join(long)),但功能有限(如超时后需手动检查线程状态,无法中途取消)
          • 对于多个线程,需要逐个 join,代码繁琐

          方法二:通过共享变量或回调传递执行状态

          通过在线程间共享对象,可以传递执行状态信息:

          class TaskResult {
              private volatile boolean success = false;
              private volatile String message = "";
              private volatile Exception exception = null;
              private volatile boolean completed = false; // 增加完成标记
          
              // 使用synchronized保证设置结果的原子性
              public synchronized void setSuccess(String message) {
                  if (!completed) { // 避免重复设置
                      this.success = true;
                      this.message = message;
                      this.completed = true;
                  }
              }
          
              public synchronized void setFailure(Exception e, String message) {
                  if (!completed) { // 避免重复设置
                      this.success = false;
                      this.exception = e;
                      this.message = message;
                      this.completed = true;
                  }
              }
          
              public boolean isSuccess() {
                  return success;
              }
          
              public boolean isCompleted() {
                  return completed;
              }
          
              public String getMessage() {
                  return message;
              }
          
              public Exception getException() {
                  return exception;
              }
          }
          
          public class SharedObjectExample {
              public static void main(String[] args) {
                  // 创建共享结果对象
                  TaskResult result = new TaskResult();
          
                  Thread thread = new Thread(() -> {
                      try {
                          System.out.println("工作线程开始执行");
                          // 模拟任务执行
                          Thread.sleep(2000);
          
                          // 设置执行成功
                          result.setSuccess("任务顺利完成");
                      } catch (InterruptedException e) {
                          result.setFailure(e, "线程被中断");
                          Thread.currentThread().interrupt();
                      } catch (Exception e) {
                          result.setFailure(e, "执行异常:" + e.getMessage());
                      }
                  });
          
                  try {
                      thread.start();
                      thread.join(); // 等待工作线程执行完成
          
                      // 检查执行结果
                      if (!result.isCompleted()) {
                          System.out.println("任务尚未完成");
                      } else if (result.isSuccess()) {
                          System.out.println("工作线程执行成功:" + result.getMessage());
                      } else {
                          System.out.println("工作线程执行失败:" + result.getMessage());
                          if (result.getException() != null) {
                              System.out.println("异常信息:" + result.getException().toString());
                          }
                      }
                  } catch (InterruptedException e) {
                      System.err.println("主线程等待过程中被中断");
                      Thread.currentThread().interrupt();
                  }
              }
          }
          

          优点

          • 可以传递更详细的执行结果和异常信息
          • 适用于简单的线程状态监控

          缺点

          • 需要自行处理线程安全问题
          • 代码结构较为松散
          • 扩展多个线程时较为复杂

          方法三:使用 Future 和 Callable 获取执行结果和异常

          Java 5 引入的 Future 接口提供了更优雅的异步任务处理方式:

          import java.util.concurrent.*;
          
          public class FutureExample {
              public static void main(String[] args) {
                  // 避免使用Executors工厂方法,直接配置ThreadPoolExecutor
                  ThreadPoolExecutor executor = new ThreadPoolExecutor(
                      1, // 核心线程数
                      1, // 最大线程数
                      0L, TimeUnit.MILLISECONDS, // 空闲线程保留时间
                      new LinkedblockingQueue<>(10), // 有界队列,防止OOM
                      new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
                  );
          
                  // 创建Callable任务
                  Callable<String> task = () -> {
                      System.out.println("工作线程开始执行");
                      // 模拟任务执行
                      Thread.sleep(2000);
                      // 任务可以有返回值
                      return "任务执行结果";
          
                      // 如果任务失败,直接抛出异常
                      // throw new RuntimeException("任务执行失败");
                  };
          
                  // 提交任务并获取Future
                  Future<String> future = executor.submit(task);
          
                  try {
                      // 等待任务完成并获取结果,可以设置超时
                      String result = future.get(3, TimeUnit.SECONDS);
                      System.out.pphprintln("工作线程执行成功,结果:" + result);
                  } catch (TimeoutException e) {
                      System.err.println("任务执行超时");
                      future.cancel(true); // 尝试取消任务
                  } catch (ExecutionException e) {
                      System.err.println("任务执行异常:" + e.getCause().getMessage());
                      // getCause()获取任务中抛出的原始异常
                  } catch (InterruptedException e) {
                      System.err.println("主线程等待过程中被中断");
                      Thread.currentThread().interrupt();
                  } finally {
                      // 记得关闭线程池
                      executor.shutdown()编程客栈;
                  }
              }
          }
          

          优点

          • 可以获取线程执行结果或异常
          • 支持超时设置
          • 可以取消任务执行

          使用 Future 处理多个任务

          import java.util.ArrayList;
          import java.util.List;
          import java.util.concurrent.*;
          
          public class MultipleFuturesExample {
              public static void main(String[] args) {
                  // 创建线程池 - 根据任务类型配置参数
                  ThreadPoolExecutor executor = new ThreadPoolExecutor(
                      3, // 核心线程数 - 根据任务特性选择
                      3, // 最大线程数
                      60L, TimeUnit.SECONDS, // 空闲线程保留时间
                      new ArrayBlockingQueue<>(10), // 有界队列,避免OOM
                      new ThreadPoolExecutor.AbortPolicy() // 任务拒绝时抛出异常
                  );
          
                  // 创建多个任务
                  List<Callable<String>> tasks = new ArrayList<>();
                  for (int i = 0; i < 3; i++) {
                      final int taskId = i;
                      tasks.add(() -> {
                          System.out.println("任务" + taskId + "开始执行");
                          // 模拟不同执行时间
                          Thread.sleep(1000 + taskId * 1000);
          
                          // 模拟可能的失败
                          if (taskId == 1) {
                              throw new RuntimeException("任务" + taskId + "执行失败");
                          }
          
                          return "任务" + taskId + "执行结果";
                      });
                  }
          
                  try {
                      // 提交所有任务并获取Future列表
                      List<Future<String>> futures = executor.invokeAll(tasks);
          
                      // 处理每个任务的结果 - 确保一个任务的异常不会影响其他任务处理
                      for (int i = 0; i < futRQJzssnaures.size(); i++) {
                          Future<String> future = futures.get(i);
                          try {
                              // 检查任务是否被取消
                              if (future.isCancelled()) {
                                  System.err.println("任务" + i + "被取消");
                                  continue;
                              }
          
                              String result = future.get();
                              System.out.println("任务" + i + "执行成功:" + result);
                          } catch (ExecutionException e) {
                              System.err.println("任务" + i + "执行异常:" + e.getCause().getMessage());
                          } catch (CancellationException e) {
                              System.err.println("任务" + i + "被取消");
                          }
                      }
                  } catch (InterruptedException e) {
                      System.err.println("主线程等待过程中被中断");
                      Thread.currentThread().interrupt();
                  } finally {
                      // 关闭线程池
                      shutdownAndAwaitTermination(executor);
                  }
              }
          
              // 安全关闭线程池的标准方法
              private static void shutdownAndAwaitTermination(ExecutorService pool) {
                  pool.shutdown();
                  try {
                      if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
                          pool.shutdownNow();
                          if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
                              System.err.println("线程池无法终止");
                          }
                      }
                  } catch (InterruptedException e) {
                      pool.shutdownNow();
                      Thread.currentThread().interrupt();
                  }
              }
          }
          

          线程池拒绝策略的选择依据

          在配置线程池时,选择合适的拒绝策略非常重要:

          • CallerRunsPolicy:将任务回退到调用者线程执行。当系统负载过高时,能起到自动降速的作用,因为提交任务的线程会被迫自己执行任务,暂时无法提交新任务。适用于不能丢弃任务且需要自动调节提交速率的场景。

          • AbortPolicy:直接抛出 RejectedExecutionException 异常。适用于需要明确知道任务被拒绝并进行特殊处理的场景,比如重要业务任务。

          • DiscardPolicy:静默丢弃任务,不做任何处理。适用于任务可以安全丢弃且无需通知的场景,如统计类非关键任务。

          • DiscardOldestPolicy:丢弃队列中等待最久的任务,然后尝试重新提交当前任务。适用于新任务比旧任务更重要的场景,如实时监控数据。

          // 选择示例
          // 1. 关键业务任务 - 不能丢失,需要感知拒绝
          new ThreadPoolExecutor(cores, maxThreads, keepAliveTime,
              timeUnit, queue, new ThreadPoolExecutor.AbortPolicy());
          
          // 2. 高负载任务 - 防止系统崩溃,允许降速
          new ThreadPoolExecutor(cores, maxThreads, keepAliveTime,
              timeUnit, queue, new ThreadPoolExecutor.CallerRunsPolicy());
          
          // 3. 非关键统计任务 - 可以安全丢弃
          new ThreadPoolExecutor(cores, maxThreads, keepAliveTime,
              timeUnit, queue, new ThreadPoolExecutor.DiscardPolicy());
          

          方法四:使用 CompletableFuture 实现异步任务监控

          Java 8 引入的 CompletableFuture 提供了更强大的异步编程能力,特别适合复杂的任务依赖场景:

          import java.util.concurrent.*;
          import java.util.function.Supplier;
          
          public class CompletableFutureExample {
              public static void main(String[] args) {
                  // 创建线程池 - 适合IO密集型任务的线程池配置
                  ThreadPoolExecutor executor = new ThreadPoolExecutor(
                      Runtime.getRuntime().availableProcessors() * 2, // IO密集型任务可使用更多线程
                      Runtime.getRuntime().availableProcessors() * 2,
                      60L, TimeUnit.SECONDS,
                      new LinkedBlockingQueue<>(100),
                      new ThreadFactory() {
                          @Override
                          public Thread newThread(Runnable r) {
                              Thread t = new Thread(r, "async-task-" + System.currentTimeMillis());
                              t.setDaemon(false); // 非守护线程
                              return t;
                          }
                      }
                  );
          
                  // 创建任务
                  Supplier<String> task = () -> {
                      try {
                          System.out.println("工作线程开始执行,ID:" + Thread.currentThread().getId());
                          // 模拟任务执行
                          Thread.sleep(2000);
          
                          // 模拟可能的异常
                          if (Math.random() < 0.3) {
                              throw new RuntimeException("任务随机失败");
                          }
          
                          return "任务执行结果";
                      } catch (InterruptedException e) {
                          // 保留完整的异常栈信息
                          Thread.currentThread().interrupt();
                          throw new IllegalStateException("任务被中断", e); // 使用标准异常
                      }
                  };
          
                  // 创建CompletableFuture并指定执行线程池
                  CompletableFuture<String> future = CompletableFuture
                      .supplyAsync(task, executor)
                      .thenApply(result -> {
                          // 对结果进行处理 - 默认在任务线程执行
                          return "处理后的" + result;
                      })
                      .exceptionally(ex -> {
                          // 异常处理
                          System.err.println("任务异常:" + ex.getCause().getMessage());
                          return "默认结果";
                      });
          
                  // 添加完成回调 - 确保回调在主线程执行
                  ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
                  future.whenCompleteAsync((result, ex) -> {
                      if (ex == null) {
                          System.out.println("任务完成,结果:" + result);
                      } else {
                          System.err.println("任务异常:" + ex.getMessage());
                      }
                  }, callbackExecutor);
          
                  try {
                      // 等待任务完成
                      String result = future.get(5, TimeUnit.SECONDS);
                      System.out.println("最终结果:" + result);
                  } catch (Exception e) {
                      System.err.println("获取结果异常:" + e.getMessage());
                  } finally {
                      // 关闭线程池
                      shutdownAndAwaitTermination(executor);
                      shutdownAndAwaitTermination(callbackExecutor);
                  }
              }
          
              // 安全关闭线程池的标准方法
              private static void shutdownAndAwaitTermination(ExecutorService pool) {
                  pool.shutdown();
                  try {
                      if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
                          pool.shutdownNow();
                          if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
                              System.err.println("线程池无法终止");
                          }
                      }
                  } catch (InterruptedException e) {
                      pool.shutdownNow();
                      Thread.currentThread().interrupt();
                  }
              }
          }
          

          CompletableFuture 多级异常处理

          CompletableFuture 的一大强项是处理复杂的任务链,下面是处理多级异常的示例:

          public void processWithErrorHandling() {
              CompletableFuture<String> future = CompletableFuture
                  .supplyAsync(() -> {
                      // 步骤1:获取原始数据
                      if (Math.random() < 0.2) {
                          throw new RuntimeException("获取数据失败");
                      }
                      return "原始数据";
                  })
                  .thenApply(data -> {
                      // 步骤2:处理数据
                      if (data.contains("错误")) {
                          throw new IllegalArgumentException("数据格式错误");
                      }
                      return data + "已处理";
                  })
                  .thenApply(processed -> {
                      // 步骤3:格式化结果
                      if (Math.random() < 0.1) {
                          throw new RuntimeException("格式化失败");
                      }
                      return "[" + processed + "]";
                  })
                  .exceptionally(ex -> {
                      // 捕获任何步骤的异常
                      Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
                      log.error("处理链异常: {}", cause.getMessage());
          
                      // 根据异常类型返回不同的默认值
                      if (cause instanceof IllegalArgumentException) {
                          return "[格式错误默认值]";
                      }
                      return "[系统错误默认值]";
                  });
          
              // 还可以添加完成回调处理最终结果
              future.whenComplete((result, ex) -> {
                  if (ex == null) {
                      System.out.println("处理完成: " + result);
                  } else {
                      System.err.println("处理失败: " + ex.getMessage());
                      // 注意:此处不能返回新值
                  }
              });
          }
          

          exceptionally 与 whenComplete 对比

          • exceptionally: 可以恢复异常并返回替代结果。用于类似 try-catch-return 的场景。
          • whenComplete: 无法修改结果,只能执行最终操作。类似 finally 块,常用于日志记录或监控。

          多个异常处理器可以组合使用,形成精细的异常处理链:

          CompletableFuture<String> future = CompletableFuture
              .supplyAsync(this::fetchData)
              .exceptionally(ex -> {
                  // 处理fetchData阶段异常
                  log.warn("数据获取失败,使用缓存数据");
                  return getCachedData();
              })
              .thenApply(this::processData)
              .exceptionally(ex -> {
                  // 处理processData阶段异常
                  log.warn("数据处理失败,使用简化处理");
                  return getSimpleProcessing();
              });
          

          组合多个 CompletableFuture

          import java.util.concurrent.*;
          import java.util.stream.Collectors;
          import java.util.stream.IntStream;
          import java.util.List;
          
          public class MultipleCompletableFuturesExample {
              public static void main(String[] args) {
                  // 创建自定义线程池 - CPU密集型任务配置
                  ThreadPoolExecutor executor = new ThreadPoolExecutor(
                      Runtime.getRuntime().availableProcessors(), // CPU密集型任务核心线程数=CPU核数
                      Runtime.getRuntime().availableProcessors(),
                      0L, TimeUnit.MILLISECONDS,
                      new LinkedBlockingQueue<>()
                  );
          
                  try {
                      // 创建多个CompletableFuture任务
                      List<CompletableFuture<String>> futures = IntStream.range(0, 5)
                          .mapToObj(i -> CompletableFuture.supplyAsync(() -> {
                              try {
                                  System.out.println("任务" + i + "开始执行");
                                  Thread.sleep(1000 + i * 500);
                                  // 模拟可能的失败
                                  if (i == 2) {
                                      throw new RuntimeException("任务" + i + "执行失败");
                                  }
                                  return "任务" + i + "执行成功";
                              } catch (InterruptedException e) {
                                  Thread.currentThread().interrupt();
                                  // 直接抛出原始异常,CompletableFuture会自动包装
                                  throw new IllegalStateException("任务中断", e);
                              }
                          }, executor)
                          .exceptionally(ex -> "任务异常:" + ex.getCause().getMessage()))
                          .collect(Collectors.toList());
          
                      // 等待所有任务完成
                      CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                          futures.toArray(new CompletableFuture[0])
                      );
          
                      // 设置超时
                      try {
                          allFutures.get(10, TimeUnit.SECONDS);
                      } catch (TimeoutException e) {
                          System.err.println("部分任务执行超时");
                      }
          
                      // 使用Stream API简化结果收集 - 替代原来的循环
                      List<String> results = futures.stream()
                          .map(future -> {
                              if (future.isDone() && !future.isCancelled() && !future.isCompletedExceptionally()) {
                                  try {
                                      return future.getNow("任务未完成");
                                  } catch (Exception e) {
                                      return "获取结果异常:" + e.getMessage();
                                  }
                              } elseRQJzssna if (future.isCancelled()) {
                                  return "任务已被取消";
                              } else if (future.isCompletedExceptionally()) {
                                  return "任务异常完成";
                              } else {
                                  return "任务未完成";
                              }
                          })
                          .collect(Collectors.toList());
          
                      // 打印所有结果
                      System.out.println("所有任务执行结果:");
                      for (int i = 0; i < results.size(); i++) {
                          System.out.println(i + ": " + results.get(i));
                      }
          
                  } catch (Exception e) {
                      System.err.println("主线程异常:" + e.getMessage());
                  } finally {
                      executor.shutdown();
                  }
              }
          }
          

          CompletableFuture 优点

          • 支持任务组合和链式调用(如流水线、并行执行)
          • 更灵活的异常处理机制
          • 可添加任务完成回调
          • 支持多任务协调(allOf、anyOf 等)

          Java中线程执行状态检测的四种可靠方法

          Future 与 CompletableFuture 的关键区别

          Future.get() vs Future.getNow()

          • get(): 阻塞方法,会等待任务完成或超时
          • getNow(defaultValue): 非阻塞方法,若任务未完成立即返回默认值

          CompletableFuture 的线程模型

          • 默认使用ForkJoinPool.commonPool(),适合计算密集型任务
          • 自定义线程池的最佳实践:
            • IO 密集型任务:线程数 = CPU 核心数 * (1 + 平均等待时间/平均计算时间)
            • CPU 密集型任务:线程数 = CPU 核心数 + 1
            • 总是使用有界队列防止 OOM
            • 明确设置拒绝策略

          线程池泄漏风险

          • 未正确关闭线程池会导致应用无法正常退出
          • 生产环境必须使用try-finally模式确保线程池关闭

          使用 UncaughtExceptionHandler 捕获线程未处理异常

          对于直接使用 Thread 的场景,可以设置 UncaughtExceptionHandler 来捕获线程中未被捕获的异常:

          public class ExceptionHandlerExample {
              public static void main(String[] args) {
                  // 设置默认的未捕获异常处理器
                  Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
                      System.err.println("线程" + thread.getName() + "发生未捕获异常:" + throwable.getMessage());
                  });
          
                  // 创建并启动可能抛出异常的线程
                  Thread thread = new Thread(() -> {
                      System.out.println("工作线程开始执行");
                      // 模拟未捕获的异常
                      throw new RuntimeException("发生了一个未处理的异常");
                  });
          
                  // 也可以为单个线程设置异常处理器
                  thread.setUncaughtExceptionHandler((t, e) -> {
                      System.err.println("线程" + t.getName() + "的专属异常处理器捕获到异常:" + e.getMessage());
                  });
          
                  thread.start();
          
                  try {
                      // 让主线程等待一会,确保能看到异常处理结果
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      Thread.currentThread().interrupt();
                  }
              }
          }
          

          注意:UncaughtExceptionHandler 只能捕获线程中未被 try-catch 处理的异常,主要用于记录异常信息,且无法区分任务是成功还是主动失败(如返回错误码)。特别说明,只有未检查异常(RuntimeException 及其子类)才会触发该处理器,受检异常必须在代码中显式处理。

          线程池配置最佳实践

          根据任务类型选择合适的线程池配置至关重要:

          // CPU密集型任务线程池 - 线程数接近CPU核心数
          ExecutorService cpuIntensivePool = new ThreadPoolExecutor(
              Runtime.getRuntime().availableProcessors(),
              Runtime.getRuntime().availableProcessors(),
              0L, TimeUnit.MILLISECONDS,
              new LinkedBlockingQueue<>(1000), // 合理大小的队列
              new ThreadPoolExecutor.CallerRunsPolicy() // 避免任务丢失
          );
          
          // IO密集型任务线程池 - 更多的线程数应对IO等待
          ExecutorService ioIntensivePool = new ThreadPoolExecutor(
              Runtime.getRuntime().availableProcessors() * 2,
              Runtime.getRuntime().availableProcessors() * 4,
              60L, TimeUnit.SECONDS,
              new LinkedBlockingQueue<>(1000),
              new ThreadPoolExecutor.AbortPolicy() // 队列满时拒绝新任务
          );
          

          避开 Executors 工厂方法的原因

          • newFixedThreadPoolnewSingleThreadExecutor 使用无界队列(LinkedBlockingQueue),可能导致 OOM
          • newCachedThreadPool 允许创建无限线程,可能导致 OOM
          • newScheduledThreadPool 同样允许无限任务堆积

          实际场景示例

          以下是一个在实际项目中常见的 Web 后台任务监控示例:

          import java.util.ArrayList;
          import java.util.List;
          import java.util.concurrent.*;
          import java.util.stream.Collectors;
          
          public class WebBackgroundTasksExample {
          
              // 模拟数据处理服务
              static class DataProcessingService {
                  public List<String> processLargeData(List<String> data) {
                      // 创建线程池 - 根据任务特性选择合适配置
                      ThreadPoolExecutor executor = new ThreadPoolExecutor(
                          Math.min(data.size(), Runtime.getRuntime().availableProcessors() * 2),
                          Math.min(data.size(), Runtime.getRuntime().availableProcessors() * 2),
                          0L, TimeUnit.MILLISECONDS,
                          new ArrayBlockingQueue<>(Math.max(10, data.size())), // 有界队列
                          new ThreadFactory() {
                              @Override
                              public Thread newThread(Runnable r) {
                                  Thread t = new Thread(r, "data-processor-" + System.currentTimeMillis());
                                  t.setUncaughtExceptionHandler((thread, ex) ->
                                      System.err.println("线程" + thread.getName() + "未捕获异常: " + ex.getMessage())
                                  );
                                  return t;
                              }
                          },
                          new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
                      );
          
                      try {
                          // 为每条数据创建处理任务
                          List<CompletableFuture<String>> futures = data.stream()
                              .map(item -> CompletableFuture.supplyAsync(() -> {
                                  try {
                                      // 模拟复杂处理
                                      if (item.contains("error")) {
                                          throw new RuntimeException("处理数据项 '" + item + "' 时出错");
                                      }
                                      Thread.sleep(500); // 模拟处理时间
                                      return item.toUpperCase(); // 处理结果
                                  } catch (InterruptedException e) {
                                      Thread.currentThread().interrupt();
                                      throw new IllegalStateException("处理被中断", e);
                                  }
                              }, executor)
                              .exceptionally(ex -> {
                                  // 记录错误,但允许继续处理其他数据
                                  System.err.println("数据处理异常: " + ex.getMessage());
                                  return "ERROR_" + item;
                              }))
                              .collect(Collectors.toList());
          
                          // 等待所有任务完成,但最多等待10秒
                          CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                              futures.toArray(new CompletableFuture[0])
                          );
          
                          try {
                              allFutures.get(10, TimeUnit.SECONDS);
                          } catch (TimeoutException e) {
                              System.err.println("处理超时,将返回已完成部分的结果");
                          }
          
                          // 使用Stream API收集结果
                          return IntStream.range(0, data.size())
                              .mapToObj(i -> {
                                  CompletableFuture<String> future = futures.get(i);
                                  String item = data.get(i);
          
                                  if (future.isDone() && !future.isCompletedExceptionally() && !future.isCancelled()) {
                                      try {
                                          return future.getNow("处理失败");
                                      } catch (Exception e) {
                                          return "处理出错: " + e.getMessage();
                                      }
                                  } else if (future.isCancelled()) {
                                      return item + "_已取消";
                                  } else if (future.isCompletedExceptionally()) {
                                      return item + "_异常完成";
                                  } else {
                                      return item + "_处理未完成";
                                  }
                              })
                              .collect(Collectors.toList());
          
                      } finally {
                          // 确保关闭线程池
                          shutdownAndAwaitTermination(executor);
                      }
                  }
          
                  // 安全关闭线程池的标准方法
                  private void shutdownAndAwaitTermination(ExecutorService pool) {
                      pool.shutdown();
                      try {
                          if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
                              pool.shutdownNow();
                              if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
                                  System.err.println("线程池无法终止");
                              }
                          }
                      } catch (InterruptedException e) {
                          pool.shutdownNow();
                          Thread.currentThread().interrupt();
                      }
                  }
              }
          
              public static void main(String[] args) {
                  // 准备测试数据
                  List<String> testData = new ArrayList<>();
                  testData.add("item1");
                  testData.add("error_item");
                  testData.add("item3");
                  testData.add("item4");
                  testData.add("another_error");
          
                  // 处理数据
                  DataProcessingService service = new DataProcessingService();
                  List<String> results = service.processLargeData(testData);
          
                  // 输出结果
                  System.out.println("处理结果:");
                  for (int i = 0; i < results.size(); i++) {
                      System.out.println(testData.get(i) + " -> " + results.get(i));
                  }
              }
          }
          

          总结

          方法支持返回值异常处理方式任务组合能力异步回调支持线程池集成度支持超时支持任务取消适用场景
          Thread.join()无(需共享变量)仅顺序等待是(基础)简单等待工作线程完成
          共享变量/对象是(手动)手动设置异常状态复杂否(除非配合 join)简单的状态传递
          Future/Callableget()抛出异常有限(需批量处理)高(线程池)是(cancel())需要获取执行结果和异常
          CompletableFutureexceptionally 链式强(allOf/anyOf)是(回调)复杂的任务链和依赖关系
          UncaughtExceptionHandler仅捕获未检查异常有限全局异常监控

          在实际开发中,根据业务场景的复杂度和需求选择合适的方法。对于简单任务,Thread.join()或共享变量就足够了;对于需要返回值和异常处理的场景,Future/Callable 是不错的选择;而对于复杂任务链和依赖关系,CompletableFuture 则是最佳方案。无论选择哪种方式,都要确保正确处理异常、设置合理超时,并妥善管理线程资源。

          以上就是Java中线程执行状态检测的四种可靠方法的详细内容,更多关于Java线程执行状态检测的资料请关注编程客栈(www.devze.com)其它相关文章!

          0

          上一篇:

          下一篇:

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          最新开发

          开发排行榜