Java List<Future> 到 Future<List> 序列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/30025428/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 09:00:29  来源:igfitidea点击:

List<Future> to Future<List> sequence

javaconcurrencyjava-8completable-future

提问by Jatin

I am trying to convert List<CompletableFuture<X>>to CompletableFuture<List<T>>. This is quite useful as when you have many asynchronous tasks and you need to get results of all of them.

我正在尝试转换List<CompletableFuture<X>>CompletableFuture<List<T>>. 当您有许多异步任务并且需要获取所有任务的结果时,这非常有用。

If any of them fails then the final future fails. This is how I have implemented:

如果其中任何一个失败,那么最终的未来就会失败。这就是我实施的方式:

  public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
        if(com.isEmpty()){
            throw new IllegalArgumentException();
        }
        Stream<? extends CompletableFuture<T>> stream = com.stream();
        CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
        return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
            x.add(y);
            return x;
        },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
            ls1.addAll(ls2);
            return ls1;
        },exec));
    }

To run it:

运行它:

ExecutorService executorService = Executors.newCachedThreadPool();
        Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return x;
        }, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);

If any of them fails then it fails. It gives output as expected even if there are a million futures. The problem I have is: Say if there are more than 5000 futures and if any of them fails, I get a StackOverflowError:

如果其中任何一个失败,那么它就会失败。即使有 100 万个期货,它也能按预期提供输出。我遇到的问题是:假设有超过 5000 个期货,如果其中任何一个失败,我会得到StackOverflowError

Exception in thread "pool-1-thread-2611" java.lang.StackOverflowError at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)

java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)处的线程“pool-1-thread-2611”java.lang.StackOverflowError中的异常java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java :1487) 在 java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) 在 java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) 在 java.util.concurrent.CompletableFuture$ThenCompose.run( CompletableFuture.java:1487)

What am I doing it wrong?

我做错了什么?

Note: The above returned future fails right when any of the future fails. The accepted answer should also take this point.

注意:当任何未来失败时,上述返回的未来就会失败。接受的答案也应该考虑这一点。

采纳答案by Misha

Use CompletableFuture.allOf(...):

使用CompletableFuture.allOf(...)

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

A few comments on your implementation:

关于您的实施的一些评论:

Your use of .thenComposeAsync, .thenApplyAsyncand .thenCombineAsyncis likely not doing what you expect. These ...Asyncmethods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsyncmethods without a good reason.

您对.thenComposeAsync,.thenApplyAsync和 的使用.thenCombineAsync可能没有达到您的预期。这些...Async方法在单独的线程中运行提供给它们的函数。因此,在您的情况下,您导致将新项目添加到列表中以在提供的执行程序中运行。无需将轻量级操作填充到缓存的线程执行器中。不要在没有thenXXXXAsync充分理由的情况下使用方法。

Additionally, reduceshould not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collectinstead.

此外,reduce不应用于累积到可变容器中。即使流是顺序流时它可能会正常工作,但如果流是并行的,它也会失败。要执行可变归约,请.collect改用。

If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequencemethod:

如果您想在第一次失败后立即异常完成整个计算,请在您的sequence方法中执行以下操作:

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;

If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow();right after result.completeExceptionally(ex);. This, of course, assumes that execonly exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Futureindividually.

此外,如果您想在第一次失败时取消剩余的操作,请exec.shutdownNow();result.completeExceptionally(ex);. 当然,这假设exec仅存在于这一计算中。如果没有,您将不得不循环并Future单独取消每个剩余部分。

回答by Holger

As Misha has pointed out, you are overusing …Asyncoperations. Further, you are composing a complex chain of operations modelling a dependency which doesn't reflect your program logic:

正如Misha 指出的那样,您正在过度使用…Async操作。此外,您正在编写一个复杂的操作链,对不反映您的程序逻辑的依赖项进行建模:

  • you create a job x which depends on the first and second job of your list
  • you create a job x+1 which depends on job x and the third job of your list
  • you create a job x+2 which depends on job x+1 and the 4th job of your list
  • you create a job x+5000 which depends on job x+4999 and the last job of your list
  • 您创建了一个工作 x,这取决于您列表中的第一个和第二个工作
  • 您创建了一个工作 x+1,它取决于工作 x 和列表中的第三个工作
  • 您创建了一个工作 x+2,这取决于工作 x+1 和列表中的第 4 个工作
  • 您创建了一个工作 x+5000,这取决于工作 x+4999 和列表中的最后一个工作

Then, canceling (explicitly or due to an exception) this recursively composed job might be performed recursively and might fail with a StackOverflowError. That's implementation-dependent.

然后,取消(明确地或由于异常)这个递归组合的作业可能会递归执行,并且可能会失败并显示StackOverflowError. 那是依赖于实现的。

As already shown by Misha, there is a method, allOfwhich allows you to model your original intention, to define one job which depends on all jobs of your list.

正如Misha 已经展示的那样,有一种方法allOf可以让您对最初的意图进行建模,以定义一个依赖于列表中所有作业的作业。

However, it's worth noting that even that isn't necessary. Since you are using an unbounded thread pool executor, you can simply post an asynchronous job collecting the results into a list and you are done. Waiting for the completion is impliedby asking for the result of each job anyway.

但是,值得注意的是,即使这样也没有必要。由于您使用的是无界线程池执行程序,因此您可以简单地发布一个异步作业,将结果收集到一个列表中,然后就完成了。无论如何,通过询问每个作业的结果来暗示等待完成。

ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
  .mapToObj(x -> CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
    return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
    () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
    executorService);

Using methods for composing dependent operations are important, when the number of threads is limited and the jobs may spawn additional asynchronous jobs, to avoid having waiting jobs stealing threads from jobs which have to complete first, but neither is the case here.

使用组合依赖操作的方法很重要,当线程数量有限并且作业可能会产生额外的异步作业时,以避免等待作业从必须首先完成的作业中窃取线程,但这里的情况并非如此。

In this specific case one job simply iterating over this large number of prerequisite jobs and waiting if necessary may be more efficient than modelling this large number of dependencies and having each job to notify the dependent job about the completion.

在这种特定情况下,一个作业简单地遍历大量先决作业并在必要时等待可能比对大量依赖项建模并让每个作业通知依赖作业完成更有效。

回答by John McClean

An example sequence operation using thenCombine on CompletableFuture

在 CompletableFuture 上使用 thenCombine 的示例序列操作

public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){

    CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());

    BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList = 
            (acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});

    BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;  

    return com.stream()
              .reduce(identity,
                      combineToList,
                      combineLists);  

   }
} 

If you don't mind using 3rd party libraries cyclops-react(I am the author) has a set of utility methods for CompletableFutures (and Optionals, Streams etc)

如果您不介意使用第 3 方库cyclops-react(我是作者)有一组用于 CompletableFutures(以及 Optionals、Streams 等)的实用方法

  List<CompletableFuture<String>> listOfFutures;

  CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);

回答by oskansavli

You can get Spotify's CompletableFutureslibrary and use allAsListmethod. I think it's inspired from Guava's Futures.allAsListmethod.

您可以获取 Spotify 的CompletableFutures库和使用allAsList方法。我认为它的灵感来自番石榴的Futures.allAsList方法。

public static <T> CompletableFuture<List<T>> allAsList(
    List<? extends CompletionStage<? extends T>> stages) {


And here is a simple implementation if you don't want to use a library:

如果您不想使用库,这里有一个简单的实现:

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    return CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[futures.size()])
    ).thenApply(ignored ->
        futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
    );
}

回答by Valery Silaev

In addition to Spotify Futures library you might try my code locate here: https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/tascalate/concurrent/CompletionStages.java(has a dependencies to other classes in same package)

除了 Spotify Futures 库之外,您还可以尝试我的代码位于此处:https: //github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/ tascalate/concurrent/CompletionStages.java(对同一包中的其他类有依赖关系)

It implements a logic to return "at least N out of M" CompletionStage-s with a policy how much errors it's allowed to tolerate. There are convinient methods for all/any cases, plus cancellation policy for the remaining futures, plus the code deals with CompletionStage-s (interface) rather than CompletableFuture (concrete class).

它实现了一个逻辑来返回“M 中至少有 N 个”CompletionStage-s,并带有允许容忍多少错误的策略。所有/任何情况都有方便的方法,加上剩余期货的取消政策,加上代码处理 CompletionStage-s(接口)而不是 CompletableFuture(具体类)。

回答by Mathias Dpunkt

Javaslanghas a very convenient FutureAPI. It also allows to make a future of collection out of a collection of futures.

Javaslang有一个非常方便的FutureAPI。它还允许从期货集合中创建集合期货。

List<Future<String>> listOfFutures = ... 
Future<Seq<String>> futureOfList = Future.sequence(listOfFutures);

See http://static.javadoc.io/io.javaslang/javaslang/2.0.5/javaslang/concurrent/Future.html#sequence-java.lang.Iterable-

http://static.javadoc.io/io.javaslang/javaslang/2.0.5/javaslang/concurrent/Future.html#sequence-java.lang.Iterable-

回答by Jatin

To add upto the accepted answer by @Misha, it can be further expanded as a collector:

为了加上@Misha 接受的答案,它可以进一步扩展为收集器:

 public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
    return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}

Now you can:

现在你可以:

Stream<CompletableFuture<Integer>> stream = Stream.of(
    CompletableFuture.completedFuture(1),
    CompletableFuture.completedFuture(2),
    CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());

回答by Kai Stapel

Disclaimer:This will not completely answer the initial question. It will lack the "fail all if one fails" part. However, I can't answer the actual, more generic question, because it was closed as a duplicate of this one: Java 8 CompletableFuture.allOf(...) with Collection or List. So I will answer here:

免责声明:这不会完全回答最初的问题。它将缺少“如果失败则全部失败”的部分。但是,我无法回答实际的、更通用的问题,因为它已作为以下问题的副本关闭:Java 8 CompletableFuture.allOf(...) with Collection 或 List。所以我会在这里回答:

How to convert List<CompletableFuture<V>>to CompletableFuture<List<V>>using Java 8's stream API?

如何转换List<CompletableFuture<V>>CompletableFuture<List<V>>使用 Java 8 的流 API?

Summary:Use the following:

总结:使用以下内容:

private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
    CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());

    BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
        futureValue.thenCombine(futureList, (value, list) -> {
                List<V> newList = new ArrayList<>(list.size() + 1);
                newList.addAll(list);
                newList.add(value);
                return newList;
            });

    BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
        List<V> newList = new ArrayList<>(list1.size() + list2.size());
        newList.addAll(list1);
        newList.addAll(list2);
        return newList;
    });

    return listOfFutures.stream().reduce(identity, accumulator, combiner);
}

Example usage:

用法示例:

List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
    .mapToObj(i -> loadData(i, executor)).collect(toList());

CompletableFuture<List<String>> futureList = sequence(listOfFutures);

Complete Example:

完整示例:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;

public class ListOfFuturesToFutureOfList {

    public static void main(String[] args) {
        ListOfFuturesToFutureOfList test = new ListOfFuturesToFutureOfList();
        test.load(10);
    }

    public void load(int numThreads) {
        final ExecutorService executor = Executors.newFixedThreadPool(numThreads);

        List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
            .mapToObj(i -> loadData(i, executor)).collect(toList());

        CompletableFuture<List<String>> futureList = sequence(listOfFutures);

        System.out.println("Future complete before blocking? " + futureList.isDone());

        // this will block until all futures are completed
        List<String> data = futureList.join();
        System.out.println("Loaded data: " + data);

        System.out.println("Future complete after blocking? " + futureList.isDone());

        executor.shutdown();
    }

    public CompletableFuture<String> loadData(int dataPoint, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();

            System.out.println("Starting to load test data " + dataPoint);

            try {
                Thread.sleep(500 + rnd.nextInt(1500));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("Successfully loaded test data " + dataPoint);

            return "data " + dataPoint;
        }, executor);
    }

    private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
        CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());

        BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
            futureValue.thenCombine(futureList, (value, list) -> {
                    List<V> newList = new ArrayList<>(list.size() + 1);
                    newList.addAll(list);
                    newList.add(value);
                    return newList;
                });

        BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
            List<V> newList = new ArrayList<>(list1.size() + list2.size());
            newList.addAll(list1);
            newList.addAll(list2);
            return newList;
        });

        return listOfFutures.stream().reduce(identity, accumulator, combiner);
    }

}

回答by JMadushan

Your task could be done easily like following,

您的任务可以轻松完成,如下所示,

final List<CompletableFuture<Module> futures =...
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).join();