如何利用CompletableFuture高效處理批量接口請求并保證結果順序?

如何利用CompletableFuture高效處理批量接口請求并保證結果順序?

利用CompletableFuture高效處理批量接口請求并保證結果順序

大規模數據處理中,并發調用外部接口能顯著提升效率。然而,直接使用線程可能導致結果順序錯亂,與原始數據失去對應關系。本文介紹如何利用Java的CompletableFuture實現并發執行,同時確保接口請求和響應結果順序一致。

問題:開發者使用CompletableFuture.runAsync進行異步操作,但線程執行順序的不確定性導致結果與原始數據列表無法對應。CompletableFuture.allOf只能保證所有任務完成,無法保證結果順序。

解決方案:將CompletableFuture.runAsync替換為CompletableFuture.supplyAsync,收集每個CompletableFuture的返回值。CompletableFuture.supplyAsync可以返回一個值,這是保持順序的關鍵。

改進后的代碼:

public static void main(String[] args) {     List<String> list = new ArrayList<>();     // ... 初始化list ...      ExecutorService executorService = new ThreadPoolExecutor(             10, //核心線程數             20, //最大線程數             60L, TimeUnit.SECONDS, //線程存活時間             new LinkedBlockingQueue<>(1024), //任務隊列             new ThreadPoolExecutor.CallerRunsPolicy()); //拒絕策略      List<CompletableFuture<String>> futures = new ArrayList<>();     for (String s : list) {         futures.add(CompletableFuture.supplyAsync(() -> {             logger.info("開始執行異步線程->>" + s);             // 調用接口,根據返回值判斷list中的值是否匹配             // 返回處理后的結果             return s + "處理后的結果";         }, executorService)); // 使用自定義線程池     }      // 所有請求完成后處理邏輯     CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))             .thenRun(() -> {                 List<String> results = futures.stream()                         .map(CompletableFuture::join)                         .collect(Collectors.toList());                 logger.info("線程執行完畢:{}", JSON.toJSONString(results));                 // 調用發送短信             })             .thenRun(() -> executorService.shutdown()); }

改進后的代碼使用List> futures存儲每個異步任務的CompletableFuture對象,每個CompletableFuture返回一個String結果。CompletableFuture.allOf仍然用于等待所有任務完成,但隨后使用futures.stream().map(CompletableFuture::join).collect(Collectors.toList())收集結果,保證結果順序與原始列表一致。CompletableFuture::join方法會阻塞直到獲取異步任務結果。 這樣確保results列表元素順序與原始list完全相同。

此方法充分利用多線程并發優勢,同時保證接口請求和響應結果的順序性,避免數據錯亂。

? 版權聲明
THE END
喜歡就支持一下吧
點贊9 分享