CompletableFuture并行调用踩坑记:从30秒到3秒的性能优化实战

CompletableFuture并行调用踩坑记:从30秒到3秒的性能优化实战

一个商品详情页接口,需要聚合商品信息、库存、用户价格体系、促销信息、评论数据。早期实现是串行查询,每个服务调用 200ms,10 个服务串行下来就是 2 秒。后面改成了 CompletableFuture 并行,以为能降到 200ms,结果一测还是 1.8 秒。问题出在哪?

场景还原

当时的代码是这样的:

public ProductDetailVO getProductDetail(Long productId) {
    Product product = productService.getProduct(productId);           // 200ms
    Inventory inventory = inventoryService.getInventory(productId);   // 200ms
    Price price = priceService.getPrice(productId, userId);          // 200ms
    Promotion promotion = promotionService.getPromotion(productId);   // 200ms
    List<Comment> comments = commentService.getComments(productId);   // 200ms
    // ...
}

串行 10 个调用,总耗时 2 秒。前端体验很差。

第一次优化:CompletableFuture.allOf

改成并行调用:

public ProductDetailVO getProductDetail(Long productId) {
    CompletableFuture<Product> productFuture =
        CompletableFuture.supplyAsync(() -> productService.getProduct(productId));

    CompletableFuture<Inventory> inventoryFuture =
        CompletableFuture.supplyAsync(() -> inventoryService.getInventory(productId));

    CompletableFuture<Price> priceFuture =
        CompletableFuture.supplyAsync(() -> priceService.getPrice(productId, userId));

    CompletableFuture<Promotion> promotionFuture =
        CompletableFuture.supplyAsync(() -> promotionService.getPromotion(productId));

    CompletableFuture<List<Comment>> commentsFuture =
        CompletableFuture.supplyAsync(() -> commentService.getComments(productId));

    CompletableFuture.allOf(
        productFuture, inventoryFuture, priceFuture,
        promotionFuture, commentsFuture
    ).join();

    return ProductDetailVO.builder()
        .product(productFuture.get())
        .inventory(inventoryFuture.get())
        .price(priceFuture.get())
        .promotion(promotionFuture.get())
        .comments(commentsFuture.get())
        .build();
}

理论上所有任务并行执行,总耗时应该是最慢那个,约 200ms。实际测试 TP99 是 1800ms,还是串行的耗时。

问题定位

问题在于 CompletableFuture.supplyAsync 默认使用的线程池是 ForkJoinPool.commonPool(),这个线程池的并行度是 Runtime.getRuntime().availableProcessors() - 1。在 8 核机器上,并行度只有 7。

如果你的任务有 10 个,但线程池只能同时运行 7 个,那剩下 3 个就要排队等候,实际效果就变成了部分并行。

而且更坑的是,如果某个服务调用特别慢(比如库存服务 500ms),它会占用一个线程,导致其他任务排队。

解决方案一:自定义线程池

为不同类型的任务配置不同的线程池:

@Configuration
public class AsyncConfig {

    @Bean("productPool")
    public ThreadPoolExecutor productPool() {
        return new ThreadPoolExecutor(
            20, 50, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(200),
            new ThreadFactoryBuilder().setNameFormat("product-%d").build(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }

    @Bean("inventoryPool")
    public ThreadPoolExecutor inventoryPool() {
        return new ThreadPoolExecutor(
            10, 20, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            new ThreadFactoryBuilder().setNameFormat("inventory-%d").build(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}

然后在调用时指定线程池:

public ProductDetailVO getProductDetail(Long productId) {
    CompletableFuture<Product> productFuture =
        CompletableFuture.supplyAsync(
            () -> productService.getProduct(productId),
            productPool
        );

    CompletableFuture<Inventory> inventoryFuture =
        CompletableFuture.supplyAsync(
            () -> inventoryService.getInventory(productId),
            inventoryPool
        );

    // ... 其他调用
}

这样库存服务用专门的线程池,不会影响商品服务。

解决方案二:thenCombine 处理依赖关系

上面的场景里,价格计算依赖商品信息,评论分页依赖商品信息。如果商品查不到,后面都没意义,可以先查商品,再并行查其他:

public ProductDetailVO getProductDetail(Long productId) {
    // 第一步:查商品(这是基础)
    CompletableFuture<Product> productFuture =
        CompletableFuture.supplyAsync(() -> productService.getProduct(productId), productPool);

    // 第二步:等商品查到后,并行查库存、价格、促销
    CompletableFuture<Inventory> inventoryFuture = productFuture
        .thenApply(product -> inventoryService.getInventory(product.getId()));

    CompletableFuture<Price> priceFuture = productFuture
        .thenApply(product -> priceService.getPrice(product.getId(), userId));

    CompletableFuture<Promotion> promotionFuture = productFuture
        .thenApply(product -> promotionService.getPromotion(product.getId()));

    CompletableFuture<List<Comment>> commentsFuture = productFuture
        .thenApply(product -> commentService.getComments(product.getId()));

    // 等待所有完成
    CompletableFuture.allOf(
        inventoryFuture, priceFuture,
        promotionFuture, commentsFuture
    ).join();

    return ProductDetailVO.builder()
        .product(productFuture.get())
        .inventory(inventoryFuture.get())
        .price(priceFuture.get())
        .promotion(promotionFuture.get())
        .comments(commentsFuture.get())
        .build();
}

这里 thenApply 的关键是不创建新线程,而是在 productFuture 完成后在那个线程里继续执行。如果 supplyAsync 用的是自定义线程池,thenApply 会在同一个线程池执行,避免线程切换开销。

解决方案三:超时控制和熔断

并行调用最怕的是某个服务超时把整个请求拖死。必须加超时控制:

public ProductDetailVO getProductDetail(Long productId) {
    CompletableFuture<Product> productFuture =
        CompletableFuture.supplyAsync(() -> productService.getProduct(productId), productPool)
            .orTimeout(3, TimeUnit.SECONDS);

    CompletableFuture<Inventory> inventoryFuture =
        CompletableFuture.supplyAsync(() -> inventoryService.getInventory(productId), inventoryPool)
            .orTimeout(2, TimeUnit.SECONDS);

    CompletableFuture<Price> priceFuture =
        CompletableFuture.supplyAsync(() -> priceService.getPrice(productId, userId), productPool)
            .orTimeout(2, TimeUnit.SECONDS);

    // ... 其他调用

    try {
        CompletableFuture.allOf(productFuture, inventoryFuture, priceFuture, ...).join();
    } catch (CompletionException e) {
        // 处理超时或异常
        log.error("获取商品详情失败", e);
        throw new BizException("系统繁忙,请稍后重试");
    }

    // get 改为 getNow,避免阻塞
    Product product = productFuture.getNow(new Product());
    Inventory inventory = inventoryFuture.getNow(new Inventory());
    // ...
}

orTimeout 是 Java 9 引入的方法。如果你的项目还在用 Java 8,可以用 CompletableFutureget(timeout, unit) 重载版本:

try {
    product = productFuture.get(3, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    product = Product.DEFAULT;
}

性能对比

优化完成后实测对比:

方案TP99平均响应
串行调用2000ms1800ms
默认线程池并行1850ms1600ms
自定义线程池450ms350ms
依赖链+超时控制380ms300ms

注意默认线程池并行效果不明显,因为并行度被 CPU 核数限制死了。

线程池配置的经验值

不是什么任务都要大线程池。根据任务性质配置:

// I/O 密集型任务:主要时间花在网络、磁盘等待
// 线程数可以设大一些,公式:cores * 2 或者 cores / (1 - 阻塞系数)
new ThreadPoolExecutor(
    cores * 2, cores * 4,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(500)
);

// CPU 密集型任务:主要时间花在计算
// 线程数 = cores + 1
new ThreadPoolExecutor(
    cores + 1, cores * 2,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(200)
);

// 高并发外部调用:需要限流
// 线程数要保守,避免把下游打爆
new ThreadPoolExecutor(
    10, 20,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100)
);

总结

CompletableFuture 并行调用踩过的坑:

  1. 默认线程池并行度有限:CPU 核数减 1,不够用时要自定义线程池
  2. 任务间有依赖要用 thenApply/thenCompose:不是所有任务都适合同时并行
  3. 必须加超时控制:防止一个慢服务拖垮整个请求
  4. 线程池要按任务类型分开:不同资源消耗的任务不要混用同一个池

从 30 秒优化到 3 秒,核心不是什么黑科技,而是让不该并行的串行、该并行的真正并行、该超时的设置超时。

最后更新 4/20/2026, 4:48:48 AM