开发者

Java虚拟线程(VirtualThread)使用详解

目录
  • 流程概要
  • 调度器继承
  • Continuation 类深度分析
    • 实现多个虚拟线程的 JVM 级别调度还需要什么?
  • 如果自己设计虚拟线程调度应该怎么做?
    • 总结

      VirtualThread 是 Java 实现轻量级并发(也称为纤程或协程)的关键。与传统的平台线程(直接映射到操作系统线程)不同,虚拟线程由 JVM 管理和调度,可以在少量平台线程上运行大量的虚拟线程,从而提高应用程序的吞吐量。

      流程概要

      核心组件:

      scheduler (Exechttp://www.devze.comutor): 每个虚拟线程都有一个调度器,它是一个 Executor 实例。

      这个调度器负责安排虚拟线程在载体线程(通常是平台线程)上执行。

      如果没有显式指定,它会使用默认的 ForkJoinPool (DEFAULT_SCHEDULER) 或者继承父虚拟线程的调度器。

      • VirtualThread.java
      // scheduler and continuation
      private final Executor scheduler;
      private final Continuation cont;
      private final Runnable runContinuation;

      在构造函数中,如果 schedulernull,会进行选择:

      • VirtualThread.java
      // ... existing code ...
      VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
          super(name, characteristics, /*bound*/ false);
          Objects.requireNonNull(task);
      
          // choose scheduler if not specified
          if (scheduler == null) {
              Thread parent = Thread.currentThread();
              if (parent instanceof VirtualThread vparent) {
                  scheduler = vparent.scheduler;
              } else {
                  scheduler = DEFAULT_SCHEDULER;
              }
          }
      
          this.scheduler = scheduler;
          this.cont = new VThreadContinuation(this, task);
          this.runContinuation = this::runContinuation;
      }
      // ... existing code ...

      cont (Continuation): 这是虚拟线程的核心。Continuation 是 JDK 内部的一个机制,允许代码执行被挂起(yield)和恢复。每个虚拟线程都包装了一个 Continuation 实例。

      VThreadContinuation 是一个内部类,它继承自 Continuation,并包装了用户提供的 Runnable 任务。

      • VirtualThread.java
      // ... existing code ...
      /**
       * The continuation that a virtual thread executes.
       */
      private static class VThreadContinuation extends Continuation {
          VThreadContinuation(VirtualThread vthread, Runnable task) {
              super(VTHREAD_SCOPE, wrap(vthread, task));
          }
          @Override
          protected void onPinned(Continuation.Pinned reason) {
          }
          private static Runnable wrap(VirtualThread vthread, Runnable task) {
              return new Runnable() {
                  @Hidden
                  @JvmtiHideEvents
                  public void run() {
                      vthread.notifyJvmtiStart(); // notify JVMTI
                      try {
                          vthread.run(task);
                      } finally {
                          vthread.notifyJvmtiEnd(); // notify JVMTI
                      }
                  }
              };
          }
      }
      // ... existing code ...

      runContinuation (Runnable): 这是一个 Runnable,其 run() 方法(即 this::runContinuation 指向的 VirtualThread#runContinuation() 方法)负责实际执行或继续执行虚拟线程的任务。它处理虚拟线程的挂载(mount)到载体线程、运行 Continuation、以及卸载(unmount)。

      状态管理: VirtualThread 内部维护了一系列状态常量(如 NEW, STARTED, RUNNING, PARKED, TERMINATED 等)和一个 volatile int state 字段来跟踪其生命周期。状态之间的转换是精心设计的,以处理各种场景,如启动、运行、暂停、阻塞和终止。

      • VirtualThread.java
      // ... existing code ...
      /*
       * Virtual thread state transitions:
       *
       *      NEW -> STARTED         // Thread.start, schedule to run
       *  STARTED -> TERMINATED      // failed to start
       *  STARTED -> RUNNING         // first run
       *  RUNNING -> TERMINATED      // done
       *
       *  RUNNING -> PARKING         // Thread parking with LockSupport.park
      // ... many more states ...
       *  YIELDED -> RUNNING         // continue execution after Thread.yield
       */
      private static final int NEW      = 0;
      private static final int STARTED  = 1;
      private static final int RUNNING  = 2;     // runnable-mounted
      
      // untimed and timed parking
      pri编程vate static final int PARKING       = 3;
      private static final int PARKED        = 4;     // unmounted
      // ... other state constants ...
      private static final int TERMINATED = 99;  // final state
      // ... existing code ...

      carrierThread (Thread): 表示当前承载该虚拟线程执行的平台线程。当虚拟线程被挂起(unmounted)时,它不占用平台线程。当它需要运行时,调度器会将其调度到某个可用的载体线程上执行。

      启动流程 (start() 方法):

      • VirtualThread.java
      // ... existing code ...
      @Override
      void start(ThreadContainer container) {
          if (!compareAndSetState(NEW, STARTED)) {
              throw new IllegalThreadStateException("Already started");
          }
      
          // bind thread to container
          assert threadContainer() == null;
          setThreadContainer(container);
      
          // start thread
          boolean addedToContainer = false;
          boolean started = false;
          try {
              container.add(this);  // may throw
              addedToContainer = true;
      
              // scoped values may be inherited
              inheritScopedValueBindings(container);
      
              // submit task to run thread, using externalSubmit if possible
              externalSubmitRunContinuationOrThrow();
              started = true;
          } finally {
              if (!started) {
                  afterDone(addedToContainer);
              }
          }
      }
      
      @Override
      public void start() {
          start(ThreadContainers.root());
      }
      // ... existing code ...
      • 当调用虚拟线程的 start() 方法时,它首先将状态从 NEW 原子地更新为 STARTED
      • 然后,它将自身添加到一个线程容器 (ThreadContainer) 中。
      • 最关键的一步是调用 externalSubmitRunContinuationOrThrow() (或类似的提交方法) 将 runContinuation 任务提交给其 scheduler 执行。

      执行与挂起 (runContinuation()yieldContinuation()):

      • VirtualThread.java
      // ... existing code ...
      @ChangesCurrentThread // allow mount/unmount to be inlined
      private void runContinuation() {
          // the carrier must be a platform thread
          if (Thread.currentThread().isVirtual()) {
              throw new WrongThreadException();
          }
      
          // set state to RUNNING
          int initialState = state();
          if (initialState == STARTED || initialState == UNPARKED
                  || initialState == UNblockED || initialState == YIELDED) {
              // newly started or continue after parking/blocking/Thread.yield
              if (!compareAndSetState(initialState, RUNNING)) {
                  return;
              }
              // consume permit when continuing after parking or blocking. If continue
              // after a timed-park or timed-wait then the timeout task is cancelled.
              if (initialState == UNPARKED) {
                  cancelTimeoutTask();
                  setParkPermit(false);
              } else if (initialState == UNBLOCKED) {
                  cancelTimeoutTask();
                  blockPermit = false;
              }
          } else {
              // not runnable
              return;
          }
      
          mount();
          try {
              cont.run();
          } finally {
              unmount();
              if (cont.isDone()) {
                  afterDone();
              } else {
                  afterYield();
              }
          }
      }
      // ... existing code ...
      • VirtualThread.java
      // ... existing code ...
      @Hidden
      private boolean yieldContinuation() {
          notifyJvmtiUnmount(/*hide*/true);
          try {
              return Continuation.yield(VTHREAD_SCOPE);
          } finally {
              notifyJvmtiMount(/*hide*/false);
          }
      }
      // ... existing code ...

      runContinuation():

      • 检查当前线程是否为平台线程(载体线程不能是虚拟线程)。
      • 原子地更新状态到 RUNNING
      • 调用 mount() 方法,将虚拟线程“挂载”到当前平台线程上。这包括设置 carrierThread 字段,并使 Thread.currentThread() 返回该虚拟线程实例。
      • 执行 cont.run(),这会实际运行或恢复 Continuation 中包装的用户任务。
      • 执行完毕或 Continuation 挂起后,调用 unmount() 方法,将虚拟线程从平台线程上“卸载”。这包括清除 carrierThread,并使 Thread.currentThread() 返回平台线程本身。
      • 根据 Continuation 是否完成 (isDone()),调用 afterDone()afterYield()
      • yieldContinuation(): 当虚拟线程需要挂起时(例如,执行了 LockSupport.park(),或者I/O操作会阻塞),它会调用 Continuation.yield(VTHREAD_SCOPE)。这会导致 Continuation 的执行暂停,runContinuation() 方法中的 cont.run() 调用会返回。此时,载体平台线程可以被释放去执行其他任务。
      • afterYield(): 当 yieldContinuation() 成功挂起后,此方法被调用。它根据虚拟线程挂起前的原因(如 PARKING, YIELDING, BLOCKING, WAITING)更新状态,并可能安排后续操作(如设置定时器唤醒、重新提交到调度器等)。

      Parking (park()parkNanos()):

      • VirtualThread.java
      // ... existing code ...
      @Override
      void park() {
          assert Thread.currentThread() == this;
      
          // complete immediately if parking permit available or interrupted
          if (getAndSetParkPermit(false) || interrupted)
              return;
      
          // park the thread
          boolean yielded = false;
          setState(PARKING);
          try {
              yielded = yieldContinuation();
          } catch (OutOfMemoryError e) {
              // park on carrier
          } finally {
              assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
              if (!yielded) {
                  assert state() == PARKING;
                  setState(RUNNING);
              }
          }
      
          // park on the carrier thread when pinned
          if (!yielded) {
              parkOnCarrierThread(false, 0);
          }
      }
      // ... existing code ...
      • 当虚拟线程调用 LockSupport.park()LockSupport.parkNanos() 时,会调用 VirtualThread 内部的 park()parkNanos() 方法。
      • 这些方法会尝试通过 yieldContinuation() 来挂起虚拟线程。
      • 如果 yieldContinuation() 成功(即虚拟线程没有被固定在载体线程上),虚拟线程的状态会变为 PARKEDTIMED_PARKED,并且载体线程被释放。
      • 如果 yieldContinuation() 失败(例如,虚拟线程在 JNI 调用中或 synchronized 块中被固定),它会退而求其次,在当前的载体线程上实际执行 U.park()(通过 parkOnCarrierThread 方法),此时载体线程会被阻塞,状态变为 PINNEDTIMED_PINNED

      调度 (submitRunContinuation 系列方法):

      • VirtualThread.java
      // ... existing code ...
      private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
          boolean done = false;
          while (!done) {
              try {
                  // Pin the continuation to prevent the virtual thread from unmounting
                  // when submitting a task. For the default scheduler this ensures that
                  // the carrier doesn't change when pushing a task. For other schedulers
                  // it avoids deadlock that could arise due to carriers and virtual
                  // threads contending for a lock.
                  if (currentThread().isVirtual()) {
                      Continuation.pin();
                      try {
                          scheduler.execute(runContinuation);
                      } finally {
                          Continuation.unpin();
                      }
                  } else {
                      scheduler.execute(runContinuation);
                  }
                  done = true;
              } catch (RejectedExecutionException ree) {
                  submitFailed(ree);
                  throw ree;
              } catch (OutOfMemoryError e) {
                  if (retryOnOOME) {
                      U.park(false, 100_000_000); // 100ms
                  } else {
                      throw e;
                  }
              }
          }
      }
      // ... existing code ...
      • 有多种提交 runContinuation 任务到调度器的方法,如 submitRunContinuation(), lazySubmitRunContinuation(), externalSubmitRunContinuation()
      • 这些方法处理了向调度器提交任务的细节,包括在特定情况下(如在虚拟线程内部提交任务给调度器)需要固定 Continuatio编程客栈n (Continuation.pin()) 以防止在提交过程中虚拟线程被意外卸载。

      总而言之,VirtualThread 的实现巧妙地结合了 Continuation 机制和 Executor 调度框架。Continuation 提供了挂起和恢复执行流的能力,使得虚拟线程在阻塞操作时可以释放底层的平台线程。调度器则负责在虚拟线程准备好运行时,将其分配给一个可用的平台线程执行。通过这种方式,少量的平台线程就可以支持大量并发的虚拟线程,大大降低了线程创建和上下文切换的开销。状态管理确保了虚拟线程在各种并发场景下的正确行为。

      调度器继承

      instanceof 模式匹配语法 (parent instanceof VirtualThread vparent)

      这是一种 Java 的语法特性,称为 模式匹配 (Pattern Matching) for instanceof。它从 Java 14 开始作为预览特性引入,并在 Java 16 中正式发布。

      在以前的 Java 版本中,如果想检查一个对象的类型,并且如果类型匹配则将其转换为该类型并使用,你通常会这样写:

      if (parent instanceof VirtualThread) {
          VirtualThread vparent = (VirtualThread) parent;
          // 然后使用 vparent
          scheduler = vparent.scheduler;
      }

      模式匹配 instanceof 简化了这个过程。

      if (parent instanceof VirtualThread vparent) 这行代码做了两件事:

      1. 类型检查: 它检查 parent 对象是否是 VirtualThread 的一个实例。
      2. 条件声明和赋值: 如果 parent 确实是 VirtualThread 的实例,那么它会声明一个新的局部变量 vparent (类型为 VirtualThread),并将 parent 自动转换 (cast) 为 VirtualThread 类型后赋值给 vparent。这个 vparent 变量只在 if 语句块为真(即类型匹配成功)的作用域内有效。

      这是一种更简洁、更安全的写法,避免了显式的类型转换和引入额外的变量声明步骤。

      这里的“父子关系”并不是指操作系统层面严格的父子进程或线程关系,而是指创建者与被创建者的关系

      Thread parent = Thread.currentThread();:这行代码获取的是当前正在执行 VirtualThread 构造函数的线程。这个线程就是新虚拟线程的“创建者”或“父”线程。

      逻辑:

      当创建一个新的 VirtualThread 时,可以显式地给它传递一个 scheduler

      如果调用者没有提供 scheduler (即 scheduler == null),那么虚拟线程的构造逻辑会尝试确定一个默认的调度器。

      这时,它会检查创建这个新虚拟线程的线程 (parent):

      • 如果 parent 本身也是一个 VirtualThread (parent instanceof VirtualThread vparent):那么新的虚拟线程将继承其创建者虚拟线程的调度器 (scheduler = vparent.scheduler;)。
      • 如果 parent 是一个平台线程 (platform thread) 或者其他非 VirtualThread 类型: 那么新的虚拟线程将使用默认的调度器 (scheduler = DEFAULT_SCHEDULER;),这个默认调度器通常是一个 ForkJoinPool

      这种设计体现了一种“上下文感知”的默认行为。如果你的代码已经在某个特定的虚拟线程(它使用着特定的调度器)中运行,当你从这个虚拟线程中再创建一个新的虚拟线程时,让新的虚拟线程默认使用与创建者相同的调度器通常是合理的。这有助于:

      • 资源管理: 如果你为一组相关的任务配置了特定的调度器(例如,具有特定线程池大小或优先级的调度器),那么从这个组内派生的新虚拟线程默认使用相同的调度器可以保持资源使用的一致性。
      • 行为一致性: 任务的执行特性(如并发级别)可以更容易地在相关的虚拟线程间保持一致。

      简单来说,如果一个虚拟线程A创建了另一个虚拟线程B,并且没有为B指定调度器,那么B就会默认使用A的调度器。如果是一个平台线程创建了虚拟线程B,并且没有为B指定调度器,那么B就会使用全局默认的调度器。

      Continuation 类深度分析

      Continuation 类是 Java 实现轻量级线程(虚拟线程)的基石。它代表了一种一次性(one-shot)的分界延续(delimited continuation)。简单来说,它封装了一段计算(一个 Runnable 任务),这段计算可以被挂起(yield),并在之后从挂起点恢复执行。

      核心能力与特性:

      封装计算单元:

      • 每个 Continuation 对象都关联一个 Runnable 任务 (this.target) 和一个 ContinuationScope (this.scope)。
      • ContinuationScope 用于界定 yield 操作的范围。当调用 Continuation.yield(scope) 时,执行会从当前 Continuation 向上回溯,直到找到匹配该 scopeContinuation 实例,然后挂起。

      执行与挂起 (run()yield()):

      run(): 这是启动或恢复 Continuation 执行的入口点。

      • 它首先会尝试“挂载”(mount)Continuation 到当前的载体线程(carrier thread)。挂载意味着将 Continuation 的执行上下文(主要是栈帧)与载体线程关联起来。
      • 通过 JLA.setContinuation(t, this) 将当前 Continuation 设置为载体线程的活动 Continuation
      • 调用本地方法 enterSpecial(this, isContinue, isVirtualThread) 来实际进入或恢复 Continuation 的执行。这个本地方法是 JVM 实现 Chttp://www.devze.comontinuation 魔法的核心,它处理栈的切换和管理。
      • enterSpecial 内部会调用 enter(),进而调用 target.run() 来执行用户代码。
      • Continuation 内部调用 Continuation.yield(scope) 时,enterSpecial (或其调用的更深层本地代码) 会保存当前执行状态(栈帧等),然后“返回”到 run() 方法中 enterSpecial 调用的地方。
      • run() 方法的 finally 块负责“卸载”(unmount)Continuation,清理状态,并将载体线程的活动 Continuation 恢复为其父 Continuation (如果存在)。

      yield(ContinuationScope scope) (静态方法):

      • 这是 Continuation 主动让出执行权的方式。
      • 它会找到当前线程上与指定 scope 匹配的最内层 Continuation
      • 然后调用该 Continuation 实例的 yield0(scope, child) 方法(这是一个非静态的内部方法,最终会触发本地代码 doYield()),将执行权交还给其父 Continuation 或调度器。
      • yieldInfo 字段用于在 yield 时传递信息,决定是彻底返回还是在父 Continuation 中继续 yield

      栈管理 (StackChunk):

      • Continuation 的执行栈不是直接使用平台线程的完整栈,而是由一系列 StackChunk 对象来管理。当 Continuation 挂起时,它的活动栈帧被保存在这些 StackChunk 中。恢复时,这些栈帧被重新加载。
      • private StackChunk tail; 字段指向 Continuation 栈的当前末端。
      • isEmpty() 方法检查所有 StackChunk 是否都为空,用于判断 Continuation 是否执行完毕。

      状态管理:

      • done: 标记 ContinuationRunnable 是否已经执行完毕。
      • mounted: 一个 volatile 标志,表示 Continuation 当前是否挂载在某个载体线程上。mount()unmount() 方法以及 compareAndSetMounted() 原子地更新此状态。
      • scopedValueCache: 用于支持 ScopedValue,在 Continuation 挂载和卸载时保存和恢复作用域值缓存。

      父子关系 (parent, child):

      • Continuation 可以形成一个层级结构。当一个 Continuation (父) 内部的某个点创建并运行另一个 Continuation (子) 时,它们之间就建立了父子关系。
      • JLA.getContinuation(currentCarrierThread()) 用于获取当前载体线程上活动的 Continuation
      • run() 方法中会处理 parentchild 的设置,确保在 yield 和恢复时能正确地在 Continuation 层级间导航。

      pinning (固定):

      • Pinned 枚举(NATIVE, MONITOR, CRITICAL_SECTION, EXCEPTION)定义了 Continuation 可能被“固定”在载体线程上而无法安全 yield 的原因。例如,如果 Continuation 的执行进入了本地方法(JNI),或者持有一个对象监视器锁(synchronized 块),它就可能被固定。
      • VThreadContinuation (在 VirtualThread.java 中定义) 的 onPinned() 方法是一个回调,当 Continuation 被固定时会被调用。虚拟线程的实现会根据这个信息决定是阻塞载体线程还是采取其他策略。

      与 JVM 的深度集成:

      • @IntrinsicCandidate 注解的本地方法如 doYield()enterSpecial() 表明这些方法的实现是由 JVM 高度优化的,它们直接操纵线程栈和执行状态。
      • jdk.internal.Access.JavaLangAccess (JLA) 和 jdk.internal.access.SharedSecrets 用于在 java.base 模块内部访问 java.lang.Thread 等类的包私有或私有成员,这是实现 Continuation 与线程状态紧密集成的关键。

      Continuation 提供的核心能力: 它提供了一种机制,使得一段Java代码的执行可以在不阻塞底层平台线程的情况下被暂停,其状态(主要是调用栈)被保存起来,之后可以在相同的或不同的平台线程上从暂停点恢复执行。这是实现用户态线程(如虚拟线程)的基础。

      实现多个虚拟线程的 JVM 级别调度还需要什么?

      仅仅有 Continuation 是不够的,还需要一个完整的框架来管理和调度它们,这正是 java.lang.VirtualThread 所做的事情。关键组件包括:

      虚拟线程的表示 (VirtualThread 类):

      • 每个虚拟线程实例内部包装一个 Continuation
      • 管理虚拟线程的生命周期状态(NEW, STARTED, RUNNING, PARKED, TERMINATED 等)。
      • 处理中断、加入 (join) 等线程操作。

      调度器 (Executor):

      • VirtualThread 需要一个调度器(通常是 ForkJoinPool,如 DEFAULT_SCHEDULER)来执行它的 Continuation
      • 调度器负责将准备好运行的虚拟线程(其 Continuation)提交给一个可用的平台线程(载体线程)来执行。
      • 当虚拟线程 yield 时,它会从载体线程上卸载,载体线程可以被调度器用于执行其他虚拟线程或任务。

      阻塞操作的适配:

      • 标准库中的阻塞操作(如 LockSupport.park(), Object.wait(), 大部分同步 I/O 操作)需要被适配,以便在虚拟线程中调用时,它们能够触发 Continuation.yield() 而不是阻塞载体线程。
      • 例如,VirtualThread.park() 方法会尝试 yieldContinuation()。如果成功,虚拟线程挂起,载体线程释放。如果失败(因为 Continuation 被固定),则会退化为在载体线程上实际 park

      与平台线程的交互(挂载/卸载):

      • VirtualThread.mount(): 当虚拟线程的 Continuation 开始在载体线程上运行时,需要将虚拟线程设置为 Thread.currentThread() 的返回值,并记录载体线程。
      • VirtualThread.unmount(): 当 Contphpinuation yield 或执行完毕时,需要从载体线程上卸载,恢复 Thread.currentThread() 指向载体线程本身。

      固定 (Pinning) 的处理:

      • 需要有机制检测 Continuation 何时被固定(例如,在 JNI 调用中或持有监视器锁)。
      • 当虚拟线程被固定时,如果它执行了阻塞操作,那么载体线程本身可能会被阻塞,因为 Continuation 无法 yield。这是虚拟线程的一个重要性能考量点。

      线程局部变量和作用域值:

      • 需要确保线程局部变量(ThreadLocal)对于虚拟线程按预期工作(即每个虚拟线程有自己的副本)。
      • ScopedValue 是一个更现代的替代方案,与虚拟线程和 Continuation 结合得更好。Continuation 类中的 scopedValueCache 字段就是为此服务的。

      如果自己设计虚拟线程调度应该怎么做?

      这是一个非常复杂的系统工程,深度依赖于 JVM 的底层支持。但从概念上讲,可以设想以下组件:

      MyContinuation:

      • 核心: 能够保存和恢复执行上下文(调用栈、程序计数器、寄存器)。这部分是最难的,需要 JVM 指令集层面的支持,或者像 libcontext 这样的库(但 Java 没有直接使用这个)。在 JDK 中,这是通过 StackChunk 和大量本地代码实现的。
      • 接口: void init(Runnable task), boolean resume(), void yield(), boolean isDone()
      • 状态: INITIAL, SUSPENDED, RUNNING, DONE

      MyVirtualThread:

      • 属性: ID, 名称, 状态 (NEW, RUNNABLE, BLOCKED, TERMINATED), 优先级 (可选)。
      • 包含: 一个 MyContinuation 实例,一个 Runnable 任务。
      • 方法: start(), interrupt(), join(), getState()

      MyScheduler (调度器):

      • 核心: 一个或多个平台线程(称为工作线程或载体线程)。
      • 队列: 一个或多个用于存放 MyVirtualThread 的就绪队列。

      调度循环:

      • 工作线程从就绪队列中取出一个 MyVirtualThread
      • 设置 Thread.currentThread() 指向这个 MyVirtualThread (逻辑上的)。
      • 调用 myVirtualThread.getContinuation().resume()
      • 如果 resume() 返回是因为 yield() 被调用:
      • 根据 yield 的原因(例如,等待 I/O,等待锁),将 MyVirtualThread 放入相应的等待结构中,或者如果只是普通的 yield,放回就绪队列。
      • 如果 resume() 返回是因为任务完成 (isDone() 为 true):
      • 更新 MyVirtualThread 状态为 TERMINATED,处理 join 等待。
      • 恢复 Thread.currentThread() 指向平台工作线程。
      • 工作线程继续从队列取下一个任务。

      同步原语和 I/O 适配:

      锁 (MyLock): 当 MyVirtualThread 尝试获取一个已被持有的锁时,它不应阻塞平台工作线程。而是:

      1. MyVirtualThread 放入该锁的等待队列。
      2. 调用其 MyContinuation.yield()
      3. 当锁被释放时,调度器将等待队列中的一个 MyVirtualThread 移回就绪队列。

      I/O: 对于非阻塞 I/O (NIO):

      1. 发起非阻塞 I/O 操作。
      2. MyVirtualThread 和一个回调(当 I/O 完成时调用)注册到 Selector
      3. 调用 MyContinuation.yield()
      4. Selector 检测到 I/O 事件完成时,执行回调,回调将对应的 MyVirtualThread 重新放入调度器的就绪队列。

      Pinning 处理:

      • 需要一种方法来标记代码段(如 JNI 调用,synchronized 块)是不可 yield 的。
      • 如果一个 MyVirtualThread 在这种不可 yield 的代码段中尝试执行一个会 yield 的操作(如获取 MyLock),调度器可能不得不阻塞当前平台工作线程,或者抛出异常,或者有其他备用策略。

      挑战:

      • 栈操作: 安全、高效地保存和恢复调用栈是最大的挑战,这需要 JVM 的深度配合。
      • 与现有 Java 生态的兼容性: 大量现有库依赖于 Thread.currentThread() 的行为和阻塞原语。
      • 调试和监控: 调试跨越多个 Continuation 片段的代码会更复杂。

      JDK 中的 ContinuationVirtualThread 在 JVM 层面解决了这些核心挑战。自己从头设计一个类似的系统将是一项艰巨的任务,但理解其基本原理有助于更好地使用这些高级并发特性。

      总结

      以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新开发

      开发排行榜