虚拟线程 | 异步编排 | 学习文档
虚拟线程 | 异步编排 | 学习文档
1.1 一个真实的场景
假设你在写一个电商商品详情页的接口,需要聚合以下数据:
用户信息(需要查数据库,耗时约 50ms)商品库存(调用库存服务,耗时约 80ms)商品价格(调用价格服务,耗时约 60ms)用户评论(查数据库,耗时约 40ms)最笨的写法(串行):
User user = fetchUser(userId); // 等 50msInventory inv = fetchInventory(skuId); // 再等 80msPrice price = fetchPrice(skuId); // 再等 60msList<Comment> cs = fetchComments(skuId); // 再等 40ms// 总耗时:50 + 80 + 60 + 40 = 230ms聪明的写法(并行):
同时发起4个请求 → 等最慢的那个完成 → 总耗时约 80ms(取最大值)并行的方式比串行快了将近 3 倍。这就是并发编程的核心价值所在:让多件事同时发生,而不是排队等待。
1.2 问题的本质
在 Java 中,最自然的并发方式是”多线程”。但传统线程有个根本问题:
一个请求 = 一个线程线程在等待 IO(数据库、网络)时 = 什么都不做,但还在占用资源想象一下:你开了一家餐厅,每个顾客来了就分配一个服务员专门负责他。但顾客点完菜后需要等 10 分钟做饭,服务员就站在旁边发呆,什么都不做。
这个服务员就是”线程”,发呆等待就是”IO 阻塞”。一台服务器通常只能同时跑几千个线程,所以高并发下,资源很快就耗尽了。
虚拟线程的出现,就是为了解决这个问题。
二、Java 并发模型的演进史
了解历史,才能理解为什么每个技术会出现。
阶段 1:原始线程时代(Java 1.0,1996年)
// 最原始的多线程写法Thread t = new Thread(() -> { System.out.println("我是一个线程");});t.start();t.join(); // 等线程结束问题:直接创建线程开销大,无法管理线程数量,容易写出各种并发 bug(死锁、竞态条件等)。
阶段 2:线程池时代(Java 5,2004年)
Java 5 引入了 java.util.concurrent 包,Doug Lea 大神贡献的杰作:
// 线程池:复用线程,控制线程数量ExecutorService pool = Executors.newFixedThreadPool(10);Future<String> future = pool.submit(() -> "执行结果");String result = future.get(); // 阻塞等待结果进步:线程可以复用了,不用每次都创建销毁。
残余问题:线程数量仍然有限,IO 阻塞期间线程仍然是浪费的。
阶段 3:异步回调时代(Java 8,2014年)
// CompletableFuture:不等结果了,告诉它"完成后做什么"CompletableFuture.supplyAsync(() -> fetchUser(id)) .thenApply(user -> processUser(user)) .thenAccept(result -> System.out.println(result));进步:线程不需要阻塞等待了,任务完成后自动触发下一步。
新问题:代码可读性变差,调试困难,错误处理很麻烦。
阶段 4:响应式编程时代(Spring WebFlux,2017年)
// Reactor:声明式数据流处理Mono.just(userId) .flatMap(id -> userService.findById(id)) .flatMap(user -> orderService.findLatest(user.getId())) .subscribe(order -> handleOrder(order));进步:吞吐量极高,支持背压(防止下游被压垮)。
新问题:学习曲线陡峭,代码风格完全不同,调试极其困难,全栈必须改造。
阶段 5:虚拟线程时代(JDK 21,2023年)
// 虚拟线程:用同步代码的写法,获得异步的性能try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { Future<User> user = executor.submit(() -> fetchUser(userId)); Future<Order> order = executor.submit(() -> fetchOrder(orderId)); return merge(user.get(), order.get()); // 阻塞但不浪费资源!}革命性进步:同步代码风格,异步级别的吞吐量。普通开发者也能写出高性能代码。
三、虚拟线程是什么?原理从零讲起
3.1 传统线程的问题(图解)
传统线程模型:
请求1 ─── [线程1] ─── 查DB(等50ms)─── 返回 请求2 ─── [线程2] ─── 查DB(等50ms)─── 返回 请求3 ─── [线程3] ─── 查DB(等50ms)─── 返回 ... 请求1000 ─ [线程1000] ─ 等待中(线程数不够了!报错!)
线程在等待 IO 期间:占着内存(约1MB/线程),但什么都不做3.2 虚拟线程的解决思路
虚拟线程引入了一个新的层级:
虚拟线程模型:
请求1 ─── [虚拟线程1] ─── 查DB(挂起,让出载体线程) 请求2 ─── [虚拟线程2] ─── 查DB(挂起,让出载体线程) ... 请求100万 ─ [虚拟线程100万] ─ 等待中(堆内存里排队,很便宜)
底层真正执行的: [载体线程1(平台线程)] ─── 执行虚拟线程1 → 虚拟线程1等IO了 → 切换执行虚拟线程5 → ... [载体线程2(平台线程)] ─── 执行虚拟线程2 → 虚拟线程2等IO了 → 切换执行虚拟线程7 → ... [载体线程3(平台线程)] ─── 执行虚拟线程3 → ... [载体线程4(平台线程)] ─── 执行虚拟线程4 → ... (只需要和 CPU 核数相当的载体线程)3.3 关键概念:Continuation(续体)
当虚拟线程遇到阻塞时,JVM 会:
- 把当前虚拟线程的调用栈快照(叫做 Continuation)序列化到堆内存
- 释放载体线程,让它去执行其他虚拟线程
- IO 完成后,把 Continuation 从堆内存反序列化,恢复执行
- 恢复执行时,可能在不同的载体线程上继续(但虚拟线程的标识没变)
这一切对开发者完全透明——你写的代码和普通线程代码完全一样。
3.4 通俗类比
想象一个图书馆管理员(载体线程)和很多读者(虚拟线程):
- 传统模型:每个读者配一个专属管理员,读者翻书(IO等待)时,管理员在旁边发呆
- 虚拟线程模型:只有几个管理员,哪个读者需要帮助就服务谁;读者自己翻书的时候,管理员去帮其他读者
资源利用率大幅提升!
四、虚拟线程 vs 平台线程:全面对比
| 对比维度 | 平台线程(传统线程) | 虚拟线程(JDK 21) |
|---|---|---|
| 底层实现 | 1:1 对应 OS 线程 | N |
| 内存占用 | 每个约 1~8MB | 每个约 1KB(初始),按需增长 |
| 最大数量 | 数百到数千(受 OS 限制) | 数百万(受堆内存限制) |
| IO 阻塞时 | OS 线程挂起,资源浪费 | 自动卸载,载体线程继续工作 |
| CPU 密集任务 | 正常 | 和平台线程相同,无优势 |
| 创建成本 | 高(需 OS 系统调用) | 极低(纯 JVM 堆分配) |
| 代码风格 | 同步,直观 | 同步,直观(和平台线程一样!) |
| 调试难度 | 容易 | 基本一样(jstack/jcmd 支持) |
| synchronized 兼容性 | 完全支持 | JDK 21/23 有 Pinning 问题(见踩坑章节) |
| ThreadLocal 兼容性 | 完全支持 | 支持但不推荐(有内存泄漏风险) |
| 适用场景 | CPU 密集、少量并发 | IO 密集、高并发 |
五、虚拟线程的使用方式(完整代码)
5.1 最简单的创建方式
// 方式1:Thread.ofVirtual() ── 直接创建Thread vt = Thread.ofVirtual() .name("my-virtual-thread") // 给虚拟线程命名,便于调试 .start(() -> { System.out.println("我是虚拟线程:" + Thread.currentThread()); System.out.println("是虚拟线程吗?" + Thread.currentThread().isVirtual()); // true });vt.join(); // 等待执行完成
// 方式2:Thread.startVirtualThread() ── 最简写法Thread.startVirtualThread(() -> System.out.println("快速创建虚拟线程"));5.2 生产推荐:每任务一虚拟线程
// newVirtualThreadPerTaskExecutor:每个任务创建一个新虚拟线程// 注意:这里不叫"线程池",虚拟线程不需要池化!try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 提交任务,和普通线程池用法完全一样 Future<String> result1 = executor.submit(() -> fetchFromDatabase(1L)); Future<Integer> result2 = executor.submit(() -> callRemoteAPI(userId));
// .get() 虽然是"阻塞"的,但背后是虚拟线程挂起,不浪费资源 String dbResult = result1.get(); Integer apiResult = result2.get();
System.out.println("DB 结果: " + dbResult + ", API 结果: " + apiResult);
} // try-with-resources 自动关闭 executor,等待所有任务完成5.3 Spring Boot 3.2+ 集成(推荐)
# application.yml:一行配置,全局生效# 所有 Tomcat/Jetty 请求处理线程自动切换为虚拟线程spring: threads: virtual: enabled: true// 如果需要手动配置(Spring Boot < 3.2 或需要定制)@Configurationpublic class VirtualThreadConfig {
// 配置 Tomcat 使用虚拟线程处理 HTTP 请求 @Bean public TomcatProtocolHandlerCustomizer<?> tomcatVirtualThreadCustomizer() { return protocolHandler -> protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor()); }
// 配置 @Async 注解使用虚拟线程 @Bean(name = "taskExecutor") public AsyncTaskExecutor virtualThreadAsyncExecutor() { return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor()); }}// 配置完后,@Async 方法自动运行在虚拟线程上@Servicepublic class OrderService {
@Async // 这个方法会在虚拟线程上异步执行 public CompletableFuture<Void> sendOrderConfirmationEmail(Long orderId) { emailService.send(orderId); // 阻塞调用,但底层是虚拟线程,不浪费资源 return CompletableFuture.completedFuture(null); }}5.4 并行聚合多个 IO 任务(最常用场景)
// 场景:商品详情页需要聚合多个服务的数据@GetMapping("/product/{id}")public ProductDetailVO getProductDetail(@PathVariable Long id) throws Exception {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 同时发起多个请求(并行执行) Future<Product> product = executor.submit(() -> productService.get(id)); Future<Inventory> inventory = executor.submit(() -> inventoryService.get(id)); Future<Price> price = executor.submit(() -> priceService.get(id)); Future<List<Comment>> comments = executor.submit(() -> commentService.getTop(id, 10));
// 等待所有结果(每个 .get() 内部是虚拟线程挂起,不占用真实线程资源) // 总耗时 ≈ max(各服务耗时),而不是 sum(各服务耗时) return ProductDetailVO.builder() .product(product.get()) .inventory(inventory.get()) .price(price.get()) .comments(comments.get()) .build(); }}5.5 结合 ScopedValue 传递上下文(推荐替代 ThreadLocal)
// ScopedValue 是专门为虚拟线程设计的上下文传递工具// 优点:不可变,自动回收,线程安全,不会内存泄漏
public class RequestContext { // 声明 ScopedValue public static final ScopedValue<UserInfo> CURRENT_USER = ScopedValue.newInstance(); public static final ScopedValue<String> TRACE_ID = ScopedValue.newInstance();}
// 在请求入口处设置上下文@Componentpublic class RequestFilter implements Filter { @Override public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) { UserInfo user = extractUserFromToken(req); String traceId = generateTraceId();
// 绑定上下文,在当前作用域内的所有(虚拟)线程都可以读取 ScopedValue .where(RequestContext.CURRENT_USER, user) .where(RequestContext.TRACE_ID, traceId) .run(() -> chain.doFilter(req, res)); }}
// 在业务代码中读取(任何地方,无需传参)@Servicepublic class OrderService { public void createOrder(OrderRequest req) { UserInfo user = RequestContext.CURRENT_USER.get(); // 直接读取 String traceId = RequestContext.TRACE_ID.get(); log.info("[{}] 用户 {} 创建订单", traceId, user.getId()); }}六、⚠️ 虚拟线程踩坑大全(12个坑)
这是本文最重要的章节,每个坑都在生产环境中真实出现过。
坑 1:给虚拟线程建线程池(最常见错误)
❌ 错误代码:
// 错误!虚拟线程不应该池化ExecutorService wrongPool = Executors.newFixedThreadPool(100, Thread.ofVirtual().factory() // 用虚拟线程工厂创建固定线程池);为什么错? 线程池的意义是”复用”线程,避免频繁创建销毁的开销。但虚拟线程创建成本极低(相当于 new Object()),根本不需要复用。池化反而带来了不必要的竞争和复杂度。
✅ 正确写法:
// 正确:每个任务一个新虚拟线程ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();坑 2:synchronized 块导致 Pinning(JDK 21/23)
❌ 问题代码:
// 在 synchronized 块内执行阻塞 IOpublic synchronized UserInfo getUser(long id) { // 虚拟线程被"钉"在载体线程上! // 此时 IO 阻塞会让整个载体线程阻塞,退化为平台线程行为 return jdbcTemplate.queryForObject("SELECT * FROM user WHERE id=?", id);}原因:JDK 21/23 中,虚拟线程在 synchronized 块内执行时无法卸载(称为 Pinning)。一旦发生 IO 阻塞,连带把载体线程也阻塞了。
如何检测:添加 JVM 启动参数 -Djdk.tracePinnedThreads=full,当发生 Pinning 时,会在控制台打印警告。
✅ 正确写法:
// 将 synchronized 替换为 ReentrantLockprivate final ReentrantLock lock = new ReentrantLock();
public UserInfo getUser(long id) { lock.lock(); try { // 现在虚拟线程可以正常卸载,不会 Pin 住载体线程 return jdbcTemplate.queryForObject("SELECT * FROM user WHERE id=?", id); } finally { lock.unlock(); // 必须在 finally 中释放! }}备注:JDK 24 已基本修复 synchronized Pinning 问题,如果你用的是 JDK 24+,这个问题影响不大。
坑 3:ThreadLocal 在虚拟线程中导致内存泄漏
❌ 问题代码:
// 假设用虚拟线程处理每个请求static ThreadLocal<byte[]> threadLocalCache = new ThreadLocal<>();
void handleRequest(Request req) { // 每个虚拟线程都存了一个大对象 threadLocalCache.set(new byte[1024 * 1024]); // 1MB processRequest(req); // 如果忘记 remove(),这 1MB 会跟着虚拟线程存在直到被 GC}// 100万虚拟线程 × 1MB = 1TB 内存??系统早崩了问题所在:平台线程数量少(几百个),ThreadLocal 泄漏影响有限。虚拟线程数量可达百万,ThreadLocal 泄漏会快速耗尽堆内存。
✅ 解决方案 1:始终调用 remove()
void handleRequest(Request req) { threadLocalCache.set(new byte[1024]); try { processRequest(req); } finally { threadLocalCache.remove(); // 必须清理!放在 finally 确保执行 }}✅ 解决方案 2:改用 ScopedValue(推荐)
// ScopedValue 在作用域结束后自动清理,从根本上避免泄漏static final ScopedValue<UserContext> USER_CTX = ScopedValue.newInstance();
ScopedValue.where(USER_CTX, new UserContext(userId)) .run(() -> processRequest(req));// 作用域结束后 UserContext 自动释放,不需要手动 remove坑 4:虚拟线程遇到 CPU 密集任务反而变慢
❌ 错误认知:
// 误以为虚拟线程万能,把 CPU 密集任务也改成虚拟线程ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
// 图片压缩是纯 CPU 操作,没有 IO 等待executor.submit(() -> compressImage(largeImage));executor.submit(() -> compressImage(largeImage));executor.submit(() -> compressImage(largeImage));// ... 提交 1000 个任务为什么变慢? CPU 密集任务不会阻塞,虚拟线程无法发挥”卸载后让出载体线程”的优势。1000 个虚拟线程同时占用 CPU,反而增加了调度开销,比用 N_cpu 个平台线程更低效。
✅ 正确做法:CPU 密集任务用平台线程池
// 根据任务类型分别使用不同的执行器@Configurationpublic class ExecutorConfig {
// IO 密集任务:用虚拟线程 @Bean("ioExecutor") public ExecutorService ioExecutor() { return Executors.newVirtualThreadPerTaskExecutor(); }
// CPU 密集任务:用固定大小的平台线程池 @Bean("cpuExecutor") public ExecutorService cpuExecutor() { int cores = Runtime.getRuntime().availableProcessors(); return Executors.newFixedThreadPool(cores + 1); }}坑 5:数据库连接池成为新瓶颈
问题描述:
虚拟线程:可以开 100万 个数据库连接池(HikariCP 默认):最多 10 个连接
结果:100万虚拟线程 → 争抢 10 个 DB 连接 → 连接池 timeout 堆积 → 请求全部超时失败❌ 错误认知: “用了虚拟线程,DB 查询就变快了”
实际情况: 虚拟线程只是不浪费等待时间,但 DB 连接数仍然是瓶颈。
✅ 解决方案:在应用层限制并发数
// 用 Semaphore 限制同时访问 DB 的并发数@Componentpublic class DatabaseAccessGuard {
// 和 HikariCP 的 maximumPoolSize 保持一致 private static final int DB_POOL_SIZE = 50; private final Semaphore semaphore = new Semaphore(DB_POOL_SIZE);
public <T> T executeWithGuard(Supplier<T> dbOperation) { semaphore.acquireUninterruptibly(); // 排队等待信号量(虚拟线程在此挂起,不浪费资源!) try { return dbOperation.get(); } finally { semaphore.release(); // 释放信号量 } }}
// 使用@Repositorypublic class UserRepository { @Autowired private DatabaseAccessGuard guard;
public User findById(Long id) { return guard.executeWithGuard(() -> jdbcTemplate.queryForObject("SELECT * FROM user WHERE id=?", User.class, id) ); }}坑 6:在虚拟线程中调用 native 方法导致 Pinning
问题代码:
// 某些依赖 native 代码的操作(如某些加密库、压缩库)// 在调用 native 方法期间,虚拟线程同样会被 Pin 住void doEncrypt(byte[] data) { // nativeEncrypt 是 native 方法 byte[] result = nativeEncrypt(data); // Pin!如果这里有 IO,载体线程阻塞 saveToDb(result); // 这个 IO 在 native 调用范围外,是安全的}解决方案: 将 native 调用和 IO 操作分离,不要在 native 调用的范围内执行 IO 阻塞。如果无法避免,将这些任务放到专用的平台线程池执行。
@Bean("nativeExecutor")public ExecutorService nativeOperationExecutor() { // 需要调用 native 方法的任务,用专用平台线程池 return Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() );}坑 7:用 isVirtual() 做业务逻辑判断
❌ 错误代码:
void processTask() { if (Thread.currentThread().isVirtual()) { // 走"虚拟线程优化路径" doNonBlockingOperation(); } else { // 走"普通线程路径" doBlockingOperation(); }}为什么错? isVirtual() 是运维/调试工具,不应该出现在业务逻辑中。这样的代码把”线程实现细节”泄漏到了业务层,将来迁移或重构非常困难。
✅ 正确做法: 业务代码不感知底层是虚拟线程还是平台线程,通过依赖注入或配置来决定。
坑 8:虚拟线程数量无上限导致 OOM
问题描述:
// 每个请求都提交大量子任务,没有限制@GetMapping("/batch")public List<Result> processBatch(@RequestBody List<Long> ids) { // ids 可能有 10万 个! try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { return ids.stream() .map(id -> executor.submit(() -> processOne(id))) .map(f -> f.get()) .toList(); // 同时创建了 10万 个虚拟线程 // 每个线程的栈 + Continuation 也是内存 // 10万 × 几KB = 几百MB,多几个并发请求就 OOM }}✅ 解决方案:分批处理 + 限制并发
@GetMapping("/batch")public List<Result> processBatch(@RequestBody List<Long> ids) { // 用 Semaphore 限制同时执行的虚拟线程数 Semaphore throttle = new Semaphore(200); // 最多 200 个并发
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { List<Future<Result>> futures = ids.stream() .map(id -> executor.submit(() -> { throttle.acquire(); // 排队 try { return processOne(id); } finally { throttle.release(); } })) .toList();
return futures.stream() .map(f -> { try { return f.get(); } catch (Exception e) { return Result.failed(e); } }) .toList(); }}坑 9:忘记处理虚拟线程的中断
❌ 问题代码:
// 在虚拟线程中执行长时间任务,但没有处理中断try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { executor.submit(() -> { while (true) { processNextItem(); // 没有检查 Thread.currentThread().isInterrupted() // 如果外部取消了这个 executor,这个任务会一直跑 } });}✅ 正确写法:
executor.submit(() -> { while (!Thread.currentThread().isInterrupted()) { // 检查中断标志 try { processNextItem(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 恢复中断标志,让外层感知 break; // 退出循环 } }});坑 10:在虚拟线程中使用 BlockingQueue 不当
❌ 问题: LinkedBlockingQueue.take() 虽然会挂起虚拟线程(这是正确的),但如果生产者速度远慢于消费者,会有大量虚拟线程”无用等待”。
✅ 建议: 使用响应式/异步队列(如 Reactor 的 Sinks),或合理设置消费者数量。
坑 11:错误理解”虚拟线程不需要关心并发问题”
// ❌ 错误认知:虚拟线程会自动处理并发static int counter = 0;
void increment() { counter++; // 这仍然是非原子操作!多个虚拟线程并发执行仍然有竞态条件}虚拟线程只改变了线程的”调度方式”,没有改变”并发安全”的本质规则。 共享变量、锁、原子操作、happens-before 关系,这些概念在虚拟线程中完全相同。
// ✅ 正确:使用原子类或锁AtomicInteger counter = new AtomicInteger(0);void increment() { counter.incrementAndGet(); // 原子操作,线程安全}坑 12:在旧版 JDBC 驱动中遇到兼容性问题
问题描述: 某些旧版 JDBC 驱动(如 MySQL Connector/J 5.x)内部使用了 synchronized 块,在虚拟线程场景下会导致 Pinning 问题。
解决方案:
- 升级到支持虚拟线程的驱动版本(MySQL Connector/J 9.x+)
- 添加 JVM 参数检测:
-Djdk.tracePinnedThreads=full,观察 Pinning 发生的位置 - 如果无法升级驱动,将 DB 操作提交到专用平台线程池
// 添加以下 JVM 参数,检测 Pinning 事件// -Djdk.tracePinnedThreads=full// 发生 Pinning 时,控制台会输出类似:// Thread[#28,ForkJoinPool-1-worker-1,5,CarrierThreads]// jdk.internal.misc.Unsafe.park(Native Method)// java.util.concurrent.locks.LockSupport.park(LockSupport.java:211)// ... (你的业务代码)七、CompletableFuture 异步编排从零讲起
虚拟线程让单个任务的”同步等待”不再浪费资源,但当你需要编排多个任务之间的关系(A 完成后做 B,A 和 B 同时做完后做 C),CompletableFuture 是非常有用的工具。
7.1 基本概念:什么是 CompletableFuture?
// Future:只能阻塞等待结果Future<String> f = executor.submit(() -> "结果");String result = f.get(); // 阻塞,直到结果出来
// CompletableFuture:可以注册回调,结果出来后自动触发CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "结果");cf.thenAccept(result -> System.out.println("得到结果:" + result)); // 不阻塞,异步回调7.2 核心方法速查
// ① 创建 CompletableFutureCompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "有返回值");CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> System.out.println("无返回值"));
// ② 转换结果(类似 Stream 的 map)CompletableFuture<Integer> cf3 = cf1.thenApply(s -> s.length()); // 同步转换CompletableFuture<Integer> cf4 = cf1.thenApplyAsync(s -> s.length()); // 异步转换(在另一个线程)
// ③ 串联依赖任务(上一步的结果作为下一步的输入,且下一步也是异步的)CompletableFuture<User> cf5 = CompletableFuture .supplyAsync(() -> fetchUserId()) // 第一步:获取 userId .thenCompose(id -> fetchUser(id)); // 第二步:用 userId 获取 User
// ④ 消费结果(最后一步,无返回值)cf5.thenAccept(user -> saveToCache(user));
// ⑤ 并行等待多个任务CompletableFuture<Void> allDone = CompletableFuture.allOf(cf1, cf3, cf5);allDone.thenRun(() -> System.out.println("全部完成!"));
// ⑥ 竞速:取最先完成的结果CompletableFuture<Object> fastest = CompletableFuture.anyOf( fetchFromCache(key), fetchFromDB(key));7.3 实战:并行聚合商品详情
@Servicepublic class ProductDetailService {
public ProductDetailVO getDetail(Long productId) { // 1. 同时发起多个异步请求 CompletableFuture<Product> productFuture = CompletableFuture.supplyAsync( () -> productService.findById(productId));
CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync( () -> inventoryService.findByProduct(productId));
CompletableFuture<Price> priceFuture = CompletableFuture.supplyAsync( () -> priceService.getPrice(productId));
CompletableFuture<List<Comment>> commentFuture = CompletableFuture.supplyAsync( () -> commentService.getTop(productId, 10));
// 2. 等待所有请求完成 CompletableFuture.allOf(productFuture, inventoryFuture, priceFuture, commentFuture) .join(); // 阻塞等待(如果是虚拟线程环境,这里不浪费资源)
// 3. 组装结果(此处所有 Future 已完成,get() 不会阻塞) return ProductDetailVO.builder() .product(productFuture.join()) .inventory(inventoryFuture.join()) .price(priceFuture.join()) .comments(commentFuture.join()) .build(); }}7.4 超时与降级处理
// 重要!生产环境必须设置超时,否则一个慢服务会拖垮整个链路
CompletableFuture<UserProfile> profileFuture = CompletableFuture .supplyAsync(() -> userProfileService.get(userId)) .orTimeout(300, TimeUnit.MILLISECONDS) // 超时抛 TimeoutException .exceptionally(ex -> { if (ex instanceof TimeoutException) { log.warn("获取用户画像超时,返回默认值"); return UserProfile.defaultProfile(); // 降级:返回默认值 } throw new RuntimeException(ex); // 其他异常继续抛出 });
// 更简洁:超时后自动返回默认值(不抛异常)CompletableFuture<UserProfile> safeProfile = CompletableFuture .supplyAsync(() -> userProfileService.get(userId)) .completeOnTimeout(UserProfile.defaultProfile(), 300, TimeUnit.MILLISECONDS);八、⚠️ CompletableFuture 踩坑大全(8个坑)
坑 1:不指定 Executor,使用 ForkJoinPool 公共池
❌ 问题代码:
// thenApplyAsync 如果不传 executor,默认用 ForkJoinPool.commonPool()CompletableFuture<String> cf = CompletableFuture .supplyAsync(() -> fetchFromDB(id)) // 默认 ForkJoinPool .thenApplyAsync(result -> transform(result)); // 默认 ForkJoinPool为什么危险? ForkJoinPool.commonPool() 是全 JVM 共享的,默认线程数 = CPU 核数 - 1。你的业务代码和其他框架代码(如并行 Stream)都在用它,互相抢占,可能导致业务任务饿死。
✅ 正确做法:
// 为业务任务指定专用的 Executor@Bean("businessExecutor")ExecutorService businessExecutor() { return Executors.newVirtualThreadPerTaskExecutor(); // 或平台线程池}
CompletableFuture<String> cf = CompletableFuture .supplyAsync(() -> fetchFromDB(id), businessExecutor) // 指定 executor .thenApplyAsync(result -> transform(result), businessExecutor); // 指定 executor坑 2:异常被静默吞掉
❌ 问题代码:
CompletableFuture<String> cf = CompletableFuture .supplyAsync(() -> { throw new RuntimeException("出错了!"); return "结果"; });
// 如果不调用 get() 或 join(),异常会静默消失,你根本不知道出错了!// 程序继续执行,但任务实际上失败了✅ 正确做法:
// 方案1:用 exceptionally 处理异常CompletableFuture<String> cf = CompletableFuture .supplyAsync(() -> fetchData()) .exceptionally(ex -> { log.error("获取数据失败", ex); return "默认值"; // 降级处理 });
// 方案2:用 handle 同时处理正常和异常结果CompletableFuture<String> cf = CompletableFuture .supplyAsync(() -> fetchData()) .handle((result, ex) -> { if (ex != null) { log.error("失败", ex); return "默认值"; } return result; });
// 方案3:如果不需要结果,用 whenComplete 做收尾cf.whenComplete((result, ex) -> { if (ex != null) log.error("任务失败", ex);});坑 3:thenApply vs thenApplyAsync 混淆
❌ 常见误区: 以为 thenApply 是同步的,所以比 thenApplyAsync 性能更好
// thenApply(同步):在触发完成的那个线程上执行CompletableFuture<String> cf = fetchDataAsync() .thenApply(data -> heavyTransform(data)); // heavyTransform 可能阻塞, // 且占用了触发完成的那个线程!理解区别:
thenApply(fn): fn 在"上一个任务完成"的线程上执行(可能是业务线程!)thenApplyAsync(fn): fn 在 ForkJoinPool 或指定的 executor 上执行(异步)✅ 规则: 如果 fn 是轻量计算(如字段映射、格式转换),用 thenApply;如果是 IO 或耗时操作,用 thenApplyAsync 并指定 executor。
坑 4:allOf 没有返回值,需要额外 get()
❌ 困惑代码:
CompletableFuture<User> userFuture = fetchUser(userId);CompletableFuture<Order> orderFuture = fetchOrder(orderId);
// allOf 返回 CompletableFuture<Void>,不包含各子任务的结果!CompletableFuture<Void> allFuture = CompletableFuture.allOf(userFuture, orderFuture);
// 等待完成后,需要分别从各自的 Future 取结果allFuture.thenRun(() -> { User user = userFuture.join(); // 此时已完成,join() 不会阻塞 Order order = orderFuture.join(); // 同上 System.out.println(user + ", " + order);});坑 5:链式调用中的线程切换导致 ThreadLocal 丢失
问题描述:
// 线程1 设置了 ThreadLocalMDC.put("traceId", "abc-123"); // MDC 底层是 ThreadLocal
CompletableFuture.supplyAsync(() -> fetchData()) // 在线程池中执行 .thenApply(data -> { // 此处已经切换到 ForkJoinPool 的线程! // MDC.get("traceId") 返回 null!日志 traceId 丢失! log.info("traceId={}", MDC.get("traceId")); // 打印 null return transform(data); });✅ 解决方案: 使用 MDC 提供的 MDCContext 传播,或手动传递上下文:
// 先快照当前线程的 MDC 上下文Map<String, String> mdcSnapshot = MDC.getCopyOfContextMap();
CompletableFuture.supplyAsync(() -> fetchData()) .thenApply(data -> { // 在回调中恢复 MDC 上下文 if (mdcSnapshot != null) MDC.setContextMap(mdcSnapshot); try { log.info("traceId={}", MDC.get("traceId")); // 正常打印 return transform(data); } finally { MDC.clear(); // 清理,防止污染线程池中的其他任务 } });坑 6:join() 和 get() 的异常包装差异
try { // get() 抛 InterruptedException 和 ExecutionException(包装层) String result = cf.get();} catch (ExecutionException e) { Throwable realCause = e.getCause(); // 需要 getCause() 才能拿到真正的异常}
// join() 不抛 checked exception,但会把异常包成 CompletionExceptiontry { String result = cf.join();} catch (CompletionException e) { Throwable realCause = e.getCause();}
// 选择建议:// - 在非异步环境(普通方法):用 get(),IDE 会提示你处理 checked exception// - 在 CompletableFuture 链式回调中:用 join(),代码更简洁坑 7:CompletableFuture 没有超时导致线程泄漏
❌ 危险代码:
// 没有超时,如果 fetchData() 永远不返回(网络超时没配置等)// 这个 CompletableFuture 会永远挂着,持有线程不释放CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> fetchData());String result = cf.get(); // 永远阻塞!✅ 必须设置超时:
// JDK 9+:orTimeout 和 completeOnTimeoutString result = CompletableFuture.supplyAsync(() -> fetchData()) .orTimeout(3000, TimeUnit.MILLISECONDS) // 3秒超时,抛 TimeoutException .get();坑 8:在高并发下大量创建 CompletableFuture 链导致内存压力
问题: 每个 thenApply/thenCompose 都会创建一个新的 CompletableFuture 对象和内部节点。在 QPS 很高的场景下,大量短命的 CompletableFuture 对象会给 GC 带来压力。
建议: 保持链条长度合理(不超过 5~6 个 stage);如果业务逻辑非常复杂,考虑使用 Project Reactor 的 Mono/Flux,其底层有更好的对象复用机制。
九、结构化并发:下一代并发编程
9.1 它解决了什么问题?
传统异步编程有一个根本缺陷:父任务和子任务的生命周期没有强绑定关系。
// CompletableFuture 写法:子任务可能"逃逸"void handleRequest() { CompletableFuture<User> userFuture = fetchUser(userId); CompletableFuture<Order> orderFuture = fetchOrder(orderId);
// 问题1:如果 orderFuture 失败,userFuture 不会自动取消,继续耗费资源 // 问题2:如果 handleRequest() 返回了,但子任务还在后台运行,成了"孤儿任务" // 问题3:异常传播逻辑复杂,容易漏掉}结构化并发(StructuredTaskScope)强制要求:子任务的生命周期不能超出父任务的作用域。
9.2 基本用法
// ShutdownOnFailure:任何一个子任务失败,立即取消其他所有子任务try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 启动子任务(每个子任务运行在独立的虚拟线程上) StructuredTaskScope.Subtask<User> userTask = scope.fork(() -> fetchUser(userId)); StructuredTaskScope.Subtask<Inventory> inventTask = scope.fork(() -> fetchInventory(skuId)); StructuredTaskScope.Subtask<Price> priceTask = scope.fork(() -> fetchPrice(skuId));
scope.join(); // 等待所有子任务完成(或有任务失败) scope.throwIfFailed(); // 如果有子任务失败,重新抛出异常(包含原始异常)
// 到这里,所有子任务都已成功完成 return buildResponse(userTask.get(), inventTask.get(), priceTask.get());
} // 作用域关闭时(无论正常还是异常),自动取消所有未完成的子任务 // 不会有"孤儿任务"留在后台!// ShutdownOnSuccess:竞速模式,取第一个成功的结果,取消其他任务try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
scope.fork(() -> fetchFromPrimaryDB(key)); // 主库查询 scope.fork(() -> fetchFromReplicaDB(key)); // 从库查询(备选) scope.fork(() -> fetchFromCache(key)); // 缓存查询(最快)
scope.join(); // 等待第一个成功完成 return scope.result(); // 返回最先成功的结果,其他任务已自动取消}9.3 带超时的结构化并发
// joinUntil:限制最长等待时间try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { var task1 = scope.fork(() -> fetchUser(userId)); var task2 = scope.fork(() -> fetchOrder(orderId));
// 最多等 500ms,超时后抛 TimeoutException scope.joinUntil(Instant.now().plusMillis(500)); scope.throwIfFailed();
return merge(task1.get(), task2.get());
} catch (TimeoutException e) { return degradedResponse(); // 超时降级}9.4 结构化并发的优势总结
| 特性 | CompletableFuture | StructuredTaskScope |
|---|---|---|
| 子任务生命周期 | 可能逃逸父作用域 | 严格限定在父作用域内 |
| 异常传播 | 需要手动处理每个 Future | 自动收集并传播 |
| 取消传播 | 需要手动 cancel() | 自动级联取消 |
| 代码可读性 | 回调链,较复杂 | try-with-resources,直观 |
| 线程转储(jstack) | 子任务和父任务看不出关系 | 层级清晰,父子关系可见 |
| JDK 要求 | JDK 8+ | JDK 21 预览,JDK 24 正式 |
十、响应式编程(Reactor/WebFlux)概览
响应式编程学习门槛较高,本章只介绍核心概念和适用场景,帮你判断”要不要学”。
10.1 核心概念:Mono 和 Flux
// Mono:0 或 1 个结果(类比单个值的异步计算)Mono<User> userMono = Mono.fromCallable(() -> fetchUser(id));
// Flux:0 到 N 个结果(类比流式数据)Flux<Order> orderFlux = Flux.fromIterable(orderIds) .flatMap(id -> Mono.fromCallable(() -> fetchOrder(id)));10.2 背压(Backpressure):响应式的核心优势
背压是响应式编程的杀手锏,在虚拟线程模型中没有原生支持。
场景:日志流处理系统生产者(消息队列):每秒产生 10万 条日志消费者(你的服务):每秒只能处理 2万 条
没有背压: → 内存中积压 8万 条/秒 → 几十秒后 OOM,系统崩溃
有背压(Reactor): → 消费者告诉生产者:"我每次只要 2万 条" → 生产者自动限速 → 系统稳定运行// Reactor 背压示例Flux.range(1, 1_000_000) .onBackpressureBuffer(1000) // 最多缓冲 1000 个,超出则根据策略处理 .flatMap(id -> processAsync(id), 20) // 最大并发 20 .subscribe( result -> log.info("处理完成: {}", result), error -> log.error("处理失败", error) );10.3 什么时候应该用响应式?
适合响应式的场景:
- 流式数据处理(Kafka 消费、大文件读取、SSE 推送)
- 需要精细背压控制
- 已有大量 WebFlux 代码,团队熟悉响应式
不适合响应式的场景(用虚拟线程更好):
- 普通 CRUD 接口
- 微服务间 HTTP 调用聚合
- 团队没有响应式经验,项目交期紧张
10.4 响应式的三大难点(让你望而却步的原因)
-
代码风格完全不同:所有 IO 调用必须改成返回
Mono/Flux的形式,不能有阻塞调用,全栈改造成本极高 -
调试困难:异常堆栈不可读,需要开启
Hooks.onOperatorDebug()才能看到有意义的堆栈 -
错误处理复杂:
onErrorReturn、onErrorResume、onErrorMap等多种方式,容易混淆
十一、四种模型横向大对比
| 对比维度 | 平台线程+线程池 | 虚拟线程 | CompletableFuture | Reactor/WebFlux |
|---|---|---|---|---|
| JDK 版本要求 | JDK 5+ | JDK 21+ | JDK 8+ | 需要 Spring WebFlux |
| 代码风格 | 同步,直观 | 同步,直观 | 异步回调链 | 声明式流 |
| IO 密集吞吐量 | 低(线程阻塞) | 高(自动卸载) | 高(不阻塞线程) | 最高(极少线程) |
| CPU 密集吞吐量 | 高 | 与线程池相同 | 与线程池相同 | 与线程池相同 |
| 并行聚合多个 IO | 复杂 | 简单(配合SC) | 简单(allOf) | 简单(zip) |
| 背压支持 | 无 | 无 | 无 | 有 |
| 错误处理 | try-catch,直观 | try-catch,直观 | exceptionally,复杂 | onErrorXxx,复杂 |
| 调试难度 | 简单 | 简单 | 中等(堆栈失真) | 困难(需开启debug) |
| 学习曲线 | 平缓 | 平缓 | 中等 | 陡峭 |
| 迁移成本 | - | 低(改几行配置) | 中等 | 高(全栈改造) |
| 适用场景 | CPU 密集 | IO 密集的新项目 | JDK 8 项目/扇出聚合 | 流式数据/极高并发 |
十二、企业级选型决策指南
12.1 决策树
你的 JDK 版本是多少?├── JDK 8/11(短期无法升级)│ ├── 主要是 IO 密集(微服务调用、DB 查询)→ CompletableFuture│ ├── 流式数据处理 → 考虑引入 Reactor│ └── 以 CPU 计算为主 → 平台线程池(ForkJoinPool)│└── JDK 21+(推荐升级路径) ├── 新项目 / 微服务 CRUD → 虚拟线程(首选) ├── 需要同时聚合多个服务 → 虚拟线程 + 结构化并发 ├── 需要精细背压控制 → 保留/引入 Reactor ├── CPU 密集计算 → 专用平台线程池(不要用虚拟线程) └── 已有 WebFlux 代码 → 继续维护,不必强行迁移12.2 推荐的企业级组合(JDK 21+)
@Configurationpublic class ConcurrencyConfig {
/** * IO 密集任务:虚拟线程 * 用于:数据库查询、微服务 HTTP 调用、文件读写 */ @Bean("ioExecutor") public ExecutorService ioExecutor() { return Executors.newVirtualThreadPerTaskExecutor(); }
/** * CPU 密集任务:平台线程池 * 用于:图像处理、PDF 生成、复杂计算 */ @Bean("cpuExecutor") public ExecutorService cpuExecutor() { int cores = Runtime.getRuntime().availableProcessors(); return new ThreadPoolExecutor( cores, cores + 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(500), new CustomThreadFactory("cpu-worker"), new ThreadPoolExecutor.CallerRunsPolicy() ); }
/** * 限制 DB 并发(配合连接池大小) */ @Bean public Semaphore dbConcurrencyGuard(DataSourceProperties props) { int maxPoolSize = props.getHikari().getMaximumPoolSize(); // 从配置读 return new Semaphore(maxPoolSize); }}12.3 完整实战:商品详情页聚合接口
/** * 完整示例:使用虚拟线程 + 结构化并发实现商品详情聚合 * 特性: * - 4 个服务调用并行执行 * - 任意一个失败,其他自动取消 * - 总超时 500ms,超时降级 * - 全链路 traceId 传递 */@RestController@RequestMapping("/api/products")public class ProductController {
@GetMapping("/{id}/detail") public ResponseEntity<ProductDetailVO> getDetail(@PathVariable Long id) { String traceId = MDC.get("traceId");
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var productTask = scope.fork(() -> { MDC.put("traceId", traceId); // 在子线程恢复 traceId return productService.findById(id); });
var inventoryTask = scope.fork(() -> { MDC.put("traceId", traceId); return inventoryService.findByProduct(id); });
var priceTask = scope.fork(() -> { MDC.put("traceId", traceId); return priceService.getPrice(id); });
var commentTask = scope.fork(() -> { MDC.put("traceId", traceId); return commentService.getTop(id, 10); });
// 最多等 500ms scope.joinUntil(Instant.now().plusMillis(500)); scope.throwIfFailed();
ProductDetailVO vo = ProductDetailVO.builder() .product(productTask.get()) .inventory(inventoryTask.get()) .price(priceTask.get()) .comments(commentTask.get()) .build();
return ResponseEntity.ok(vo);
} catch (TimeoutException e) { // 超时降级:返回基础商品信息,不包含库存/价格/评论 log.warn("[{}] 聚合超时,返回降级响应", traceId); return ResponseEntity.ok(ProductDetailVO.minimal(id));
} catch (ExecutionException e) { // 某个子任务失败,其他已自动取消 log.error("[{}] 聚合失败: {}", traceId, e.getCause().getMessage()); throw new ServiceException("商品信息获取失败", e.getCause()); } }}核心总结与学习路径
三句话总结
-
虚拟线程:让同步代码获得异步吞吐,IO 密集场景的首选,JDK 21 起可用,学习成本极低。
-
CompletableFuture:编排多个任务之间关系的工具,JDK 8+ 可用,但回调链复杂时可读性下降,配合虚拟线程使用效果最好。
-
结构化并发:下一代并发编程范式,父子任务生命周期强绑定,是虚拟线程的最佳搭档,JDK 24 正式 GA。
学习路径建议
阶段 1(1~2 周):打好基础 → 理解线程、线程池、Future 的基本概念 → 学会使用 ThreadPoolExecutor(参考线程池调优文章) → 学会基本的 CompletableFuture 用法
阶段 2(1~2 周):掌握虚拟线程 → 搭建 JDK 21 环境 → 将 Spring Boot 项目升级到 3.2+,开启虚拟线程 → 动手实践:改写一个接口,使用虚拟线程并行聚合数据 → 重点:理解并规避本文中的 12 个踩坑点
阶段 3(2~3 周):进阶异步编排 → 深入 CompletableFuture 的各种编排模式 → 学习结构化并发(StructuredTaskScope) → 实践:用结构化并发重写并行聚合接口
阶段 4(可选,按需学习):响应式编程 → 学习 Project Reactor(Mono/Flux) → 学习 Spring WebFlux → 仅在真正需要背压控制或流式处理时深入参考资料:JEP 444(Virtual Threads)· JEP 453(Structured Concurrency)· 《Java 并发编程实战》Brian Goetz · Project Loom 官方文档 · Spring Boot 3.2 Release Notes