CompletableFuture并行调用踩坑记:从30秒到3秒的性能优化实战
CompletableFuture并行调用踩坑记:从30秒到3秒的性能优化实战
一个商品详情页接口,需要聚合商品信息、库存、用户价格体系、促销信息、评论数据。早期实现是串行查询,每个服务调用 200ms,10 个服务串行下来就是 2 秒。后面改成了 CompletableFuture 并行,以为能降到 200ms,结果一测还是 1.8 秒。问题出在哪?
场景还原
当时的代码是这样的:
public ProductDetailVO getProductDetail(Long productId) {
Product product = productService.getProduct(productId); // 200ms
Inventory inventory = inventoryService.getInventory(productId); // 200ms
Price price = priceService.getPrice(productId, userId); // 200ms
Promotion promotion = promotionService.getPromotion(productId); // 200ms
List<Comment> comments = commentService.getComments(productId); // 200ms
// ...
}
串行 10 个调用,总耗时 2 秒。前端体验很差。
第一次优化:CompletableFuture.allOf
改成并行调用:
public ProductDetailVO getProductDetail(Long productId) {
CompletableFuture<Product> productFuture =
CompletableFuture.supplyAsync(() -> productService.getProduct(productId));
CompletableFuture<Inventory> inventoryFuture =
CompletableFuture.supplyAsync(() -> inventoryService.getInventory(productId));
CompletableFuture<Price> priceFuture =
CompletableFuture.supplyAsync(() -> priceService.getPrice(productId, userId));
CompletableFuture<Promotion> promotionFuture =
CompletableFuture.supplyAsync(() -> promotionService.getPromotion(productId));
CompletableFuture<List<Comment>> commentsFuture =
CompletableFuture.supplyAsync(() -> commentService.getComments(productId));
CompletableFuture.allOf(
productFuture, inventoryFuture, priceFuture,
promotionFuture, commentsFuture
).join();
return ProductDetailVO.builder()
.product(productFuture.get())
.inventory(inventoryFuture.get())
.price(priceFuture.get())
.promotion(promotionFuture.get())
.comments(commentsFuture.get())
.build();
}
理论上所有任务并行执行,总耗时应该是最慢那个,约 200ms。实际测试 TP99 是 1800ms,还是串行的耗时。
问题定位
问题在于 CompletableFuture.supplyAsync 默认使用的线程池是 ForkJoinPool.commonPool(),这个线程池的并行度是 Runtime.getRuntime().availableProcessors() - 1。在 8 核机器上,并行度只有 7。
如果你的任务有 10 个,但线程池只能同时运行 7 个,那剩下 3 个就要排队等候,实际效果就变成了部分并行。
而且更坑的是,如果某个服务调用特别慢(比如库存服务 500ms),它会占用一个线程,导致其他任务排队。
解决方案一:自定义线程池
为不同类型的任务配置不同的线程池:
@Configuration
public class AsyncConfig {
@Bean("productPool")
public ThreadPoolExecutor productPool() {
return new ThreadPoolExecutor(
20, 50, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
new ThreadFactoryBuilder().setNameFormat("product-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
@Bean("inventoryPool")
public ThreadPoolExecutor inventoryPool() {
return new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("inventory-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
然后在调用时指定线程池:
public ProductDetailVO getProductDetail(Long productId) {
CompletableFuture<Product> productFuture =
CompletableFuture.supplyAsync(
() -> productService.getProduct(productId),
productPool
);
CompletableFuture<Inventory> inventoryFuture =
CompletableFuture.supplyAsync(
() -> inventoryService.getInventory(productId),
inventoryPool
);
// ... 其他调用
}
这样库存服务用专门的线程池,不会影响商品服务。
解决方案二:thenCombine 处理依赖关系
上面的场景里,价格计算依赖商品信息,评论分页依赖商品信息。如果商品查不到,后面都没意义,可以先查商品,再并行查其他:
public ProductDetailVO getProductDetail(Long productId) {
// 第一步:查商品(这是基础)
CompletableFuture<Product> productFuture =
CompletableFuture.supplyAsync(() -> productService.getProduct(productId), productPool);
// 第二步:等商品查到后,并行查库存、价格、促销
CompletableFuture<Inventory> inventoryFuture = productFuture
.thenApply(product -> inventoryService.getInventory(product.getId()));
CompletableFuture<Price> priceFuture = productFuture
.thenApply(product -> priceService.getPrice(product.getId(), userId));
CompletableFuture<Promotion> promotionFuture = productFuture
.thenApply(product -> promotionService.getPromotion(product.getId()));
CompletableFuture<List<Comment>> commentsFuture = productFuture
.thenApply(product -> commentService.getComments(product.getId()));
// 等待所有完成
CompletableFuture.allOf(
inventoryFuture, priceFuture,
promotionFuture, commentsFuture
).join();
return ProductDetailVO.builder()
.product(productFuture.get())
.inventory(inventoryFuture.get())
.price(priceFuture.get())
.promotion(promotionFuture.get())
.comments(commentsFuture.get())
.build();
}
这里 thenApply 的关键是不创建新线程,而是在 productFuture 完成后在那个线程里继续执行。如果 supplyAsync 用的是自定义线程池,thenApply 会在同一个线程池执行,避免线程切换开销。
解决方案三:超时控制和熔断
并行调用最怕的是某个服务超时把整个请求拖死。必须加超时控制:
public ProductDetailVO getProductDetail(Long productId) {
CompletableFuture<Product> productFuture =
CompletableFuture.supplyAsync(() -> productService.getProduct(productId), productPool)
.orTimeout(3, TimeUnit.SECONDS);
CompletableFuture<Inventory> inventoryFuture =
CompletableFuture.supplyAsync(() -> inventoryService.getInventory(productId), inventoryPool)
.orTimeout(2, TimeUnit.SECONDS);
CompletableFuture<Price> priceFuture =
CompletableFuture.supplyAsync(() -> priceService.getPrice(productId, userId), productPool)
.orTimeout(2, TimeUnit.SECONDS);
// ... 其他调用
try {
CompletableFuture.allOf(productFuture, inventoryFuture, priceFuture, ...).join();
} catch (CompletionException e) {
// 处理超时或异常
log.error("获取商品详情失败", e);
throw new BizException("系统繁忙,请稍后重试");
}
// get 改为 getNow,避免阻塞
Product product = productFuture.getNow(new Product());
Inventory inventory = inventoryFuture.getNow(new Inventory());
// ...
}
orTimeout 是 Java 9 引入的方法。如果你的项目还在用 Java 8,可以用 CompletableFuture 的 get(timeout, unit) 重载版本:
try {
product = productFuture.get(3, TimeUnit.SECONDS);
} catch (TimeoutException e) {
product = Product.DEFAULT;
}
性能对比
优化完成后实测对比:
| 方案 | TP99 | 平均响应 |
|---|---|---|
| 串行调用 | 2000ms | 1800ms |
| 默认线程池并行 | 1850ms | 1600ms |
| 自定义线程池 | 450ms | 350ms |
| 依赖链+超时控制 | 380ms | 300ms |
注意默认线程池并行效果不明显,因为并行度被 CPU 核数限制死了。
线程池配置的经验值
不是什么任务都要大线程池。根据任务性质配置:
// I/O 密集型任务:主要时间花在网络、磁盘等待
// 线程数可以设大一些,公式:cores * 2 或者 cores / (1 - 阻塞系数)
new ThreadPoolExecutor(
cores * 2, cores * 4,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500)
);
// CPU 密集型任务:主要时间花在计算
// 线程数 = cores + 1
new ThreadPoolExecutor(
cores + 1, cores * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200)
);
// 高并发外部调用:需要限流
// 线程数要保守,避免把下游打爆
new ThreadPoolExecutor(
10, 20,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
总结
CompletableFuture 并行调用踩过的坑:
- 默认线程池并行度有限:CPU 核数减 1,不够用时要自定义线程池
- 任务间有依赖要用 thenApply/thenCompose:不是所有任务都适合同时并行
- 必须加超时控制:防止一个慢服务拖垮整个请求
- 线程池要按任务类型分开:不同资源消耗的任务不要混用同一个池
从 30 秒优化到 3 秒,核心不是什么黑科技,而是让不该并行的串行、该并行的真正并行、该超时的设置超时。
