利用CompletableFuture高效處理批量接口請求并保證結果順序
大規模數據處理中,并發調用外部接口能顯著提升效率。然而,直接使用多線程可能導致結果順序錯亂,與原始數據失去對應關系。本文介紹如何利用Java的CompletableFuture實現并發執行,同時確保接口請求和響應結果順序一致。
問題:開發者使用CompletableFuture.runAsync進行異步操作,但線程執行順序的不確定性導致結果與原始數據列表無法對應。CompletableFuture.allOf只能保證所有任務完成,無法保證結果順序。
解決方案:將CompletableFuture.runAsync替換為CompletableFuture.supplyAsync,收集每個CompletableFuture的返回值。CompletableFuture.supplyAsync可以返回一個值,這是保持順序的關鍵。
改進后的代碼:
public static void main(String[] args) { List<String> list = new ArrayList<>(); // ... 初始化list ... ExecutorService executorService = new ThreadPoolExecutor( 10, //核心線程數 20, //最大線程數 60L, TimeUnit.SECONDS, //線程存活時間 new LinkedBlockingQueue<>(1024), //任務隊列 new ThreadPoolExecutor.CallerRunsPolicy()); //拒絕策略 List<CompletableFuture<String>> futures = new ArrayList<>(); for (String s : list) { futures.add(CompletableFuture.supplyAsync(() -> { logger.info("開始執行異步線程->>" + s); // 調用接口,根據返回值判斷list中的值是否匹配 // 返回處理后的結果 return s + "處理后的結果"; }, executorService)); // 使用自定義線程池 } // 所有請求完成后處理邏輯 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenRun(() -> { List<String> results = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); logger.info("線程執行完畢:{}", JSON.toJSONString(results)); // 調用發送短信 }) .thenRun(() -> executorService.shutdown()); }
改進后的代碼使用List
此方法充分利用多線程并發優勢,同時保證接口請求和響應結果的順序性,避免數據錯亂。