Java CompletableFuture in loop:如何收集所有响应并处理错误
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/51139489/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
CompletableFuture in loop: How to collect all responses and handle errors
提问by Rudrani Angira
I am trying to call a rest api for PUT
request in a loop. Each call is a CompletableFuture
. Each api call returns an object of type RoomTypes.RoomType
我正在尝试PUT
在循环中为请求调用休息 api 。每个调用都是一个CompletableFuture
. 每个 api 调用都返回一个类型的对象RoomTypes.RoomType
I want to collect the responses (both successful and error responses) in different lists. How do I achieve that? I am sure I cannot use
allOf
because it would not get all the results if any one call fails to update.How do I log errors/exception for each call?
我想收集不同列表中的响应(成功响应和错误响应)。我如何做到这一点?我确定我不能使用,
allOf
因为如果任何一个调用更新失败,它就不会获得所有结果。如何记录每次调用的错误/异常?
public void sendRequestsAsync(Map<Integer, List> map1) {
List<CompletableFuture<Void>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
List<RoomTypes.RoomType> responses = new ArrayList<>(); //List for responses
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (Map.Entry<Integer, List> entry :map1.entrySet()) {
CompletableFuture requestCompletableFuture = CompletableFuture
.supplyAsync(
() ->
//API call which returns object of type RoomTypes.RoomType
updateService.updateRoom(51,33,759,entry.getKey(),
new RoomTypes.RoomType(entry.getKey(),map2.get(entry.getKey()),
entry.getValue())),
yourOwnExecutor
)//Supply the task you wanna run, in your case http request
.thenApply(responses::add);
completableFutures.add(requestCompletableFuture);
}
采纳答案by Didier L
You can simply use allOf()
to get a future that is completed when all your initial futures are completed (exceptionally or not), and then split them between succeeded and failed using Collectors.partitioningBy()
:
您可以简单地使用allOf()
来获取在所有初始期货完成时完成的期货(无论是否例外),然后使用以下方法将它们分为成功和失败Collectors.partitioningBy()
:
List<CompletableFuture<RoomTypes.RoomType>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (Map.Entry<Integer, List> entry : map1.entrySet()) {
CompletableFuture<RoomTypes.RoomType> requestCompletableFuture = CompletableFuture
.supplyAsync(
() ->
//API call which returns object of type RoomTypes.RoomType
updateService.updateRoom(51, 33, 759, entry.getKey(),
new RoomTypes.RoomType(entry.getKey(), map2.get(entry.getKey()),
entry.getValue())),
yourOwnExecutor
);
completableFutures.add(requestCompletableFuture);
}
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
// avoid throwing an exception in the join() call
.exceptionally(ex -> null)
.join();
Map<Boolean, List<CompletableFuture<RoomTypes.RoomType>>> result =
completableFutures.stream()
.collect(Collectors.partitioningBy(CompletableFuture::isCompletedExceptionally)));
The resulting map will contain one entry with true
for the failed futures, and another entry with false
key for the succeeded ones. You can then inspect the 2 entries to act accordingly.
生成的映射将包含一个true
用于失败期货的条目,以及另一个带有false
密钥的条目用于成功的期货。然后,您可以检查 2 个条目以采取相应的行动。
Note that there are 2 slight changes compared to your original code:
请注意,与您的原始代码相比,有两个细微的变化:
requestCompletableFuture
is now aCompletableFuture<RoomTypes.RoomType>
thenApply(responses::add)
and theresponses
list were removed
requestCompletableFuture
现在是一个CompletableFuture<RoomTypes.RoomType>
thenApply(responses::add)
并responses
删除了列表
Concerning logging/exception handling, just add the relevant requestCompletableFuture.handle()
to log them individually, but keep the requestCompletableFuture
and not the one resulting from handle()
.
关于记录/异常处理,只需添加相关的requestCompletableFuture.handle()
单独记录它们,但保留requestCompletableFuture
,并从不是一个产生handle()
。
回答by Edwin Dalorzo
Alternatively, perhaps you can approach the problem from a different perspective and instead of forcing the use of CompletableFuture
, you can use a CompletionServiceinstead.
或者,也许您可以从不同的角度处理问题,而不是强制使用CompletableFuture
,您可以改用CompletionService。
The whole idea of the CompletionService
is that as soon as an answer for a given future is ready, it gets placed in a queue from which you can consume results.
的整个想法CompletionService
是,一旦给定未来的答案准备就绪,它就会被放入一个队列中,您可以从中使用结果。
Alternative 1: Without CompletableFuture
备选方案 1:没有 CompletableFuture
CompletionService<String> cs = new ExecutorCompletionService<>(executor);
List<Future<String>> futures = new ArrayList<>();
futures.add(cs.submit(() -> "One"));
futures.add(cs.submit(() -> "Two"));
futures.add(cs.submit(() -> "Three"));
futures.add(cs.submit(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));
List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();
while (futures.size() > 0) {
Future<String> f = cs.poll();
if (f != null) {
futures.remove(f);
try {
//at this point the future is guaranteed to be solved
//so there won't be any blocking here
String value = f.get();
successes.add(value);
} catch (Exception e) {
failures.add(e.getMessage());
}
}
}
System.out.println(successes);
System.out.println(failures);
Which yields:
其中产生:
[One, Two, Three, Five]
[java.lang.RuntimeException: Sucks to be four]
Alternative 2: With CompletableFuture
备选方案 2:使用 CompletableFuture
However, if you really, really need to deal with CompletableFuture
you can submit those to the completion service as well, just by placing them directly into its queue:
但是,如果您真的,真的需要处理,CompletableFuture
您也可以将它们提交给完成服务,只需将它们直接放入其队列中即可:
For example, the following variation has the same result:
例如,以下变体具有相同的结果:
BlockingQueue<Future<String>> tasks = new ArrayBlockingQueue<>(5);
CompletionService<String> cs = new ExecutorCompletionService<>(executor, tasks);
List<Future<String>> futures = new ArrayList<>();
futures.add(CompletableFuture.supplyAsync(() -> "One"));
futures.add(CompletableFuture.supplyAsync(() -> "Two"));
futures.add(CompletableFuture.supplyAsync(() -> "Three"));
futures.add(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));
//places all futures in completion service queue
tasks.addAll(futures);
List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();
while (futures.size() > 0) {
Future<String> f = cs.poll();
if (f != null) {
futures.remove(f);
try {
//at this point the future is guaranteed to be solved
//so there won't be any blocking here
String value = f.get();
successes.add(value);
} catch (Exception e) {
failures.add(e.getMessage());
}
}
}