开发者

JDK21对虚拟线程的几种用法实践指南

目录
  • 一、参考官方文档
  • 二、什么是虚拟线程
  • 三、几种用法
    • 1、Thread.ofVirtual().start()
    • 2、Executors.newVirtualThreadPerTaskExecutor()
    • 3、信号量进行限制并发
    • 4、官网更多其他用法
  • 四、监控对内存的使用情况
    • 五、兼容ThreadLocal
      • 1、传统平台线程 + ThreadLocal 的关系
      • 2、虚拟线程(Virtual Thread) + ThreadLocal 的关系
      • 3、两者的变化
      • 4、怎么兼容线程模式下ThreadLocal
    • 六、一些虚拟线程相关的检查代码
      • 1、检查自己的spingboot版本
      • 2、检查自己是否默认开启全局的虚拟线程
      • 3、监控虚拟线程存活数量
    • 七、实战
      • 总结

        一、参考官方文档

        Virtual Threads

        Virtual threads are lightweight threads that reduce the effort of writing, maintaining, and debugging high-throughput concurrent applications.

        二、什么是虚拟线程

        目标:用少量平台线程(Platform Threads)支撑海量并发任务(如 100 万请求),提升吞吐量。

        类型说明
        平台线程(PlatformThread)JVM 映射到操作系统线程(OS Thread),创建成本高(默认几百 KB 栈空间),数量受限(通常几千)
        虚拟线程(VirtualThread)JVM 内部轻量线程,由 Thread.ofVirtual() 创建,不直接绑定 OS 线程,可创建百万级
        平台线程 = 真实员工(数量少、成本高)
        虚拟线程 = 临时工(数量多、任务来了再分配真实员工干活)

        适用场景:

        场景说明
        高并发 I/O 任务如 HTTP 请求、数据库查询、Redis 调用(阻塞多、CPU 少)
        Web 服务器处理请求Tomcat、Netty、Spring WebFlux 等每请求一线程模型
        批量处理任务如 10 万条数据同步、日志处理
        调用多个外部服务并行调用订单、用户、库存服务并聚合结果

        不适用场景:

        场景说明
        CPU 密集型任务如图像处理、加密解密、大数据计算 → 用平台线程池(ForkJoinPool
        长时间运行的无限循环虚拟线程调度器可能“饿死”其他任务
        JNI / Native 代码虚拟线程会被挂起,直到 native 方法返回(阻塞平台线程)
        持有 synchronized 块太久会阻塞平台线程,降低并发能力

        三、几种用法

        你的需求推荐用法
        学习虚拟线程Thread.ofVirtual().start()
        高并发 Web 请求、批量 I/OnewVirtualThreadPerTaskExecutor()
        CPU 密集型任务(如计算)❌ 不要用虚拟线程,用 ForkJoinPool

        1、Thread.ofVirtual().start()

        —— 最基础的创建方式

        Thread vt = Thread.ofVirtual()
            .name("worker")
            .start(() -> {
                System.out.println("Hello from " + Thread.currentThread());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {}
                System.out.println("Done");
            });
        
        vt.join(); // 等待完成

        适用场景:

        • 学习虚拟线程原理
        • 单个、简单的并发任务
        • 不需要任务管理或资源回收

        不适用场景:

        • 批量任务(如 10 万个请求)
        • 需要统一管理生命周期
        • 生产环境高并发服务

        建议:

        不推荐用于生产环境。它没有资源池管理,无法限制并发,容易造成内存溢出。

        2、Executors.newVirtualThreadPerTaskExecutor()

        —— 最推荐的生产级用法

        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (int i = 0; i < 10_000; i++) {
                executor.submit(() -> {
                    Thread.sleep(Duration.ofSeconds(1));
                    System.out.println("Task " + Thread.currentThread());
                    return "result";
                });
            }
        } // 自动等待所有任务完成

        参考官方:

        try (ExecutorService myExecutor = Executors.newVirtualThreadPerTaskExecutor()) {
            Future<?> future = myExecutor.submit(() -> System.out.println("Running thread"));
            future.get();
            System.out.println("Task completed");
            // ...

        适用场景 :

        场景说明
        Web 请求处理每个 HTTP 请求启动一个虚拟线程(Spring Boot 6+ 默认)
        批量 I/O 操作如读取 10 万个文件、调用外部 API
        消息队列消费每条消息一个虚拟线程处理
        Redis / DB 批量查询配合 multiGetpipeline 使用

        优势 :

        • 自动管理虚拟线程生命周期
        • try-with-resources 自动 close() 并等待所有任务完成
        • 无需担心资源泄漏

        用官方文档的话来说总结几点:

        JDK21对虚拟线程的几种用法实践指南

        • 这个线程池会为每个提交的任务新建一个虚拟线程。

        • 特别适合 服务器场景批量并发任务

        • 你可以轻松提交成千上万个任务,不会像传统线程那样资源紧张。

        建议:

        在生产环境中最应该使用的虚拟线程方式!尤其适合:高并发 + I/O 密集 + 短任务

        3、信号量进行限制并发

        其实有同学会想到,虚拟线程能不能像平台线程那样子进行池化,关于这个的官方解释,在官方文档中提到了很多次,多次提及两者并不应该是一个概念,你可以将平台线程池视为从队列中提取任务并处理它们的“工人”,将虚拟线程视为任务本身。

        JDK21对虚拟线程的几种用法实践指南

        如果想要限制并发,和平台线程类似的线程池那样,可以使用信号量进行限制:

        Semaphore sem = new Semaphore(10);
        ...
        Result foo() {
            sem.acquire();
            try {
                return callLimitedService();
            } finally {
                sem.release();
            }
        }

        可能同学会认为用信号量去限制的话和线程池还是很大差距,下面官方文档也给了提示:

        仅仅用信号量阻塞一些虚拟线程,可能看起来与向固定线程池提交任务有着本质的不同,但实际上并非如此。向线程池提交任务会将其排队等待稍后执行,而信号量(或任何其他用于此目的的阻塞同步构造)在内部会创建一个线程队列,这些线程因信号量而被阻塞,该队列反映了等待池线程执行的任务队列。由于虚拟线程就是任务,因此最终结构是等价的。

        JDK21对虚拟线程的几种用法实践指南

        4、官网更多其他用法

        Thread.Builder builder = Thread.ofVirtual().name("MyThread");
        Runnable task = () -> {
            System.out.println("Running thread");
        };
        Thread t = builder.start(task);
        System.out.println("Thread t name: " + t.getName(编程));
        t.join();

        The following example creates and starts two virtual threads with:

        Thread.Builder builder = Thread.ofVirtual().name("worker-", 0);
        Runnable task = () -> {
            System.out.println("Thread ID: " + Thread.currentThread().threadId());
        };
        
        // name "worker-0"
        Thread t1 = builder.start(task);   
        t1.join();
        System.out.println(t1.getName() + " terminated");
        
        // name "worker-1"
        Thread t2 = builder.start(task);   
        t2.join();  
        System.out.println(t2.getName() + " terminated");

        输出:

        Thread ID: 21
        worker-0 terminated
        Thread ID: 24
        worker-1 terminated

        希望同时向不同的服务发起多个出站调用:

        void handle(Request request, Response response) {
            var url1 = ...
            var url2 = ...
         
            try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
                var future1 = executor.submit(() -> fetchURL(url1));
                var future2 = executor.submit(() -> fetchURL(url2));
                response.send(future1.get() + future2.get());
            } catch (ExecutionException | InterruptedException e) {
                response.fail(e);
            }
        }
         
        String fetchURL(URL url) throws IOException {
            try (var in = url.openStream()) {
                return new String(in.readAllBytes(), StandardCharsets.UTF_8);
            }
        }

        四、监控对内存的使用情况

        模拟10w虚拟线程

        import Java.lang.management.ManagementFactory;
        import java.lang.management.MemoryMXBean;
        import java.lang.management.MemoryUsage;
        import java.util.concurrent.ExecutorService;
        import java.util.concurrent.Executors;
        
        public class VirtualThreadTest {
        
            // 方法一:查看当前 JVM 内存使用情况
            public static void printMemoryUsage(String phase) {
                MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
                MemoryUsage heapMemory = memoryMXBean.getHeapwww.devze.comMemoryUsage();
                System.out.printf("[%s] Heap Memory Used: %d MB / %d MB%n",
                        phase,
                        heapMemory.getUsed() / (1024 * 1024),
                        heapMemory.getMax() / (1024 * 1024));
            }
        
            // 方法二:创建虚拟线程执行任务
            public static void runVirtualThreads(int taskCount) throws InterruptedException {
                ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
                for (int i = 0; i < taskCount; i++) {
                    int finalI = i;
                    executor.submit(() -> {
                        System.out.printf("虚拟线程[%s] 执行任务 %d%n",
                                Thread.currentThread(),
                                finalI);
                        try {
                            Thread.sleep(1000); // 模拟 IO 阻塞
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    });
                }
                executor.shutdown();
                while (!executor.isTerminated()) {
                    Thread.sleep(100);
                }
            }
        
            // 方法三:循环查看内存(模拟 GC 回收效果)
            public static void monitorMemory(int seconds) throws InterruptedException {
                for (int i = 0; i < seconds; i++) {
                    printMemoryUsage("监控中");
                    System.gc(); // 建议 GC,可能不会马上执行
                    Thread.sleep(1000);
                }
            }
        
            // 主方法测试
            public static void main(String[] args) throws Exception {
                // 1. 初始内存
                printMemoryUsage("初始");
        
                // 2. 创建 1 万个虚拟线程任务
                runVirtualThreads(100_000);
        
                // 3. 任务执行后内存
                printMemoryUsage("任务执行后");
        
                // 4. 持续观察 GC 是否回收虚拟线程
                monitorMemory(10);
            }
        }
        

        JDK21对虚拟线程的几种用法实践指南

        可以看到快速创建10万的虚拟线程也能很快的将内存进行回收,可能大家会想,不是说虚拟线程是创建一个线程就回收内存了吗,为啥你这里统一进行释放呢,可以看到我这里进行编程客栈阻塞,模拟创建10万线程大概会占用多少内存

        JDK21对虚拟线程的几种用法实践指南

        五、兼容ThreadLocal

        在我们现阶段大多数用户信息都是通过ThreadLocal进行传递过来,每个线程绑定一个用户

        1、传统平台线程 + ThreadLocal 的关系

        下面我们回顾一下传统的ThreadLocal和线程的关系:

        • 每个 Thread 对象内部有一个 ThreadLocal.ThreadLocalMap 成员
        • ThreadLocal.set(value) → 当前线程的 map 中存入 <ThreadLocal 实例, value>
        • ThreadLocal.get() → 从当前线程的 map 中取出对应值
        • 生命周期与线程绑定:线程存活 → ThreadLocal 副本存在
        • 实际并发 100 请求 → 最多 100 个副本(但线程池通常只有 200 个线程,会复用)

        关系图如下:

        平台线程 T1  → 有自己的 ThreadLocalMap → 存储 BaseContext.value
        平台线程 T2  → 有自己的 ThreadLocalMap → 存储 BaseContext.value
        平台线程 T3  → 有自己的 ThreadLocalMap → 存储 BaseContext.value

        2、虚拟线程(Virtual Thread) + ThreadLocal 的关系

        • JVM 实现的轻量级线程
        • 多对一映射到“载体线程”(Carrier Thread,即平台线程)
        • 栈在堆上分配,约 1KB
        • 可创建 数十万甚至百万个
        • 每个 HTTP 请求一个虚拟线程(高并发)
        • 虚拟线程也是 java.lang.Thread 的子类
        • 所以它也有自己的 ThreadLocalMap
        • set() / get() 语法完全兼容

        关系图如下:

        载体线程 C1(平台线程) 
           ├─ 虚拟线程 VT1 → 有自己的 ThreadLocalMap → 存储 BaseContext.value
           ├─ 虚拟线程 VT2 → 有自己的 ThreadLocalMap → 存储 BaseContext.value
           └─ 虚拟线程 VT3 → 有自己的 ThreadLocalMap → 存储 BaseContext.value
        
        载体线程 C2
           ├─ 虚拟线程 VT10001 → 有自己的 ThreadLocalMap → 存储 BaseContext.value
           └─ 虚拟线程 VT10002 → 有自己的 ThreadLocalMap → 存储 BaseContext.value

        3、两者的变化

        • 传统:200 个线程 → 最多 200 个 ThreadLocal 副本
        • 虚拟线程:10 万个并发请求 → 10 万个 ThreadLocal 副本

        传统的线程的话会进行线程池排队,不会频繁创建线程,从而不会导致线程爆炸。要是某个虚拟线程执行什么任务。

        4、怎么兼容线程模式下ThreadLocal

        • 不全局使用虚拟线程(比如不设置 -Dspring.threads.virtual.enabled=true
        • 只在特定地方手动创建虚拟线程
        • 在这些虚拟线程中使用 ThreadLocal 传递上下文(如用户信息)
        // 获取当前用户信息(来自平台线程的 ThreadLocal)
        String currentUser = BaseContext.getCurrentId(); // 假设这是从 ThreadLocal 拿的
        
        // 在虚拟线程中使用时,显式传入
        Thread.startVirtualThread(() -> {
            // 显式使用传入的上下文
            processUserOrder(currentUser, orderId);
        });

        六、一些虚拟线程相关的检查代码

        1、检查自己的spiwww.devze.comngboot版本

        import org.springframework.core.SpringVersion;
        
        public class SpringVersionCheck {
            public static void main(String[] args) {
                System.out.println("Spring Framework 版本: " + SpringVersion.getVersion());
            }
        }

        2、检查自己是否默认开启全局的虚拟线程

        import org.springframework.boot.SpringApplication;
        import org.springframework.boot.autoconfigure.SpringBootApplication;
        import org.springframework.context.ConfigurableApplicationContext;
        import org.springframework.scheduling.annotation.EnableAsync;
        
        @SpringBootApplication
        @EnableAsync
        public class App {
            public static void main(String[] args) {
                ConfigurableApplicationContext ctx = SpringApplication.run(App.class, args);
        
                // 测试异步方法
                AsyncService service = ctx.getBean(AsyncService.class);
                service.asyncTask();
            }
        }
        
        import org.springframework.scheduling.annotation.Async;
        import org.springframework.stereotype.Service;
        
        @Service
        
        class AsyncService {
        
            @Async
        
            public void asyncTask() {
        
                Thread thread = Thread.currentThread();
        
                System.out.println("线程类型: " + (thread.isVirtual() ? "虚拟线程" : "平台线程"));
        
                System.out.println("线程名称: " + thread.getName());
        
            }
        
        }

        3、监控虚拟线程存活数量

        import java.lang.management.ManagementFactory;
        import java.lang.management.ThreadMXBean;
        
        public class VirtualThreadMonitor {
            public static void main(String[] args) {
                ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
        
                // 获取当前存活的线程总数(包括平台线程和虚拟线程)
                long totalThreadCount = threadBean.getThreadCount();
        
                // 初始化虚拟线程计数器
                long virtualThreadCount = 0;
        
                // 遍历当前所有线程,检查哪些是虚拟线程
                for (Thread thread : Thread.getAllStackTraces().keySet()) {
                    if (thread.isVirtual()) {  // 如果线程是虚拟线程
                        virtualThreadCount++;
                    }
                }
        
                // 输出结果
                System.out.println("当前存活 虚拟线程数: " + virtualThreadCount);
                System.out.println("当前存活 总线程数: " + totalThreadCount);
            }
        }
        

        七、实战

        新增conf配置类,让spring帮忙管理他的生命周期

        import lombok.extern.slf4j.Slf4j;
        import org.springframework.context编程客栈.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.scheduling.annotation.AsyncConfigurer;
        import org.springframework.scheduling.annotation.EnableAsync;
        
        import java.util.concurrent.Executor;
        import java.util.concurrent.ExecutorService;
        import java.util.concurrent.Executors;
        
        @Slf4j
        @Configuration
        @EnableAsync  // 启用 @Async 支持
        public class VirtualThreadConfig implements AsyncConfigurer {
        
            private ExecutorService createVirtualThreadExecutor(String prefix) {
                return Executors.newThreadPerTaskExecutor(
                        Thread.ofVirtual()
                                .name(prefix, 0)
                                .uncaughtExceptionHandler((thread, ex) ->
                                        log.error("虚拟线程 [{}] 执行异常", thread.getName(), ex))
                                .factory()
                );
            }
        
            /**
             * 用于单个用户收藏数据的异步回写(实时场景)
             */
            @Bean("favoriteRedisWriterVirtualThreadExecutor")
            public Executor favoriteUserWriteVirtualThreadExecutor() {
                log.info("初始化虚拟线程池:favoriteUserWriteVirtualThreadExecutor");
                return createVirtualThreadExecutor("vt-fav-user-write-");
            }
        
            /**
             * 用于批量同步收藏数据到 Redis(定时任务场景)
             */
            @Bean("favoriteBATchWriteVirtualThreadExecutor")
            public Executor favoriteBatchWriteVirtualThreadExecutor() {
                log.info("初始化虚拟线程池:favoriteBatchWriteVirtualThreadExecutor");
                return createVirtualThreadExecutor("vt-fav-batch-write-");
            }
        
            @Override
            public Executor getAsyncExecutor() {
                return createVirtualThreadExecutor("vt-default-async-");
            }
        }

        在定时任务中,我们需要对收藏数据进行全量重写到 Redis,以保证缓存一致性。由于数据量较大(3200+ 条记录),采用同步方式逐条写入会导致任务执行时间过长,影响系统响应性。为了提升任务执行效率,我们引入 虚拟线程(Virtual Threads),将每条 Redis 写入操作提交到独立的虚拟线程中并发执行。这样可以在不增加平台线程负担的前提下,显著提高 I/O 密集型操作的吞吐量,缩短全量刷新时间。

        JDK21对虚拟线程的几种用法实践指南

        JDK21对虚拟线程的几种用法实践指南

        JDK21对虚拟线程的几种用法实践指南

        JDK21对虚拟线程的几种用法实践指南

        至此就简练的实现异步虚拟线程的改造方案

        总结

        到此这篇关于JDK21对虚拟线程的几种用法实践指南的文章就介绍到这了,更多相关JDK21对虚拟线程内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

        0

        上一篇:

        下一篇:

        精彩评论

        暂无评论...
        验证码 换一张
        取 消

        最新开发

        开发排行榜