Java CompletableFuture:高效并發處理批量接口請求并保持結果順序
大數據量處理中,并發執行任務能顯著提升效率。但若需保持任務執行順序與數據順序一致,簡單的多線程并行處理便捉襟見肘。本文將演示如何利用Java的CompletableFuture在多線程環境下,高效處理1000多條數據的第三方接口請求,并確保結果順序與原始數據順序一致。
原始代碼使用CompletableFuture.runAsync執行異步任務,雖然實現了并發,但由于線程執行順序的不確定性,導致最終結果順序與原始數據列表不符。這是因為runAsync返回CompletableFuture
為解決此問題,需改用CompletableFuture.supplyAsync。supplyAsync能返回結果,從而在CompletableFuture完成之后,按原始順序收集處理結果。改進后的代碼如下:
public static void main(String[] args) { List<String> list = new ArrayList<>(); for (int i = 0; i < 1000; i++) { list.add("數據" + i); } ExecutorService executorService = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()); List<CompletableFuture<String>> futures = new ArrayList<>(); for (String s : list) { futures.add(CompletableFuture.supplyAsync(() -> { logger.info("開始執行異步線程->>" + s); // 調用接口 // 根據接口返回值判斷list中的值是否匹配 // 返回處理后的結果 return s + "處理后的結果"; }, executorService)); // 使用自定義線程池 } // 所有請求完成后處理邏輯 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenRun(() -> { List<String> results = futures.stream().map(CompletableFuture::join).collect(Collectors.toList()); logger.info("線程執行完畢:{}", JSON.toJSONString(results)); // 調用發送短信 }).thenRun(() -> executorService.shutdown()); }
改進后的代碼用CompletableFuture.supplyAsync替換了CompletableFuture.runAsync,并使用futures.stream().map(CompletableFuture::join).collect(Collectors.toList())按原始順序收集每個CompletableFuture的返回結果。CompletableFuture.allOf仍然用于等待所有異步任務完成,確保所有結果收集完畢后再進行后續處理。
通過以上修改,我們成功利用CompletableFuture在保證并發執行的同時,維護了結果的順序性,從而高效地處理了大量接口請求。