如何使用CompletableFuture高效處理批量接口請求并保證結果順序?

如何使用CompletableFuture高效處理批量接口請求并保證結果順序?

Java CompletableFuture:高效并發處理批量接口請求并保持結果順序

大數據量處理中,并發執行任務能顯著提升效率。但若需保持任務執行順序與數據順序一致,簡單的線程并行處理便捉襟見肘。本文將演示如何利用Java的CompletableFuture在多線程環境下,高效處理1000多條數據的第三方接口請求,并確保結果順序與原始數據順序一致。

原始代碼使用CompletableFuture.runAsync執行異步任務,雖然實現了并發,但由于線程執行順序的不確定性,導致最終結果順序與原始數據列表不符。這是因為runAsync返回CompletableFuture,不攜帶任何結果。

為解決此問題,需改用CompletableFuture.supplyAsync。supplyAsync能返回結果,從而在CompletableFuture完成之后,按原始順序收集處理結果。改進后的代碼如下:

public static void main(String[] args) {     List<String> list = new ArrayList<>();     for (int i = 0; i < 1000; i++) {         list.add("數據" + i);     }      ExecutorService executorService = new ThreadPoolExecutor(             Runtime.getRuntime().availableProcessors(),             Runtime.getRuntime().availableProcessors() * 2,             60L, TimeUnit.SECONDS,             new LinkedBlockingQueue<>(),             new ThreadPoolExecutor.CallerRunsPolicy());      List<CompletableFuture<String>> futures = new ArrayList<>();     for (String s : list) {         futures.add(CompletableFuture.supplyAsync(() -> {             logger.info("開始執行異步線程->>" + s);             // 調用接口             // 根據接口返回值判斷list中的值是否匹配             // 返回處理后的結果             return s + "處理后的結果";         }, executorService)); // 使用自定義線程池     }      // 所有請求完成后處理邏輯     CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenRun(() -> {         List<String> results = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());         logger.info("線程執行完畢:{}", JSON.toJSONString(results));         // 調用發送短信     }).thenRun(() -> executorService.shutdown()); }

改進后的代碼用CompletableFuture.supplyAsync替換了CompletableFuture.runAsync,并使用futures.stream().map(CompletableFuture::join).collect(Collectors.toList())按原始順序收集每個CompletableFuture的返回結果。CompletableFuture.allOf仍然用于等待所有異步任務完成,確保所有結果收集完畢后再進行后續處理。

通過以上修改,我們成功利用CompletableFuture在保證并發執行的同時,維護了結果的順序性,從而高效地處理了大量接口請求。

? 版權聲明
THE END
喜歡就支持一下吧
點贊7 分享