Java 等待未来的名单

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/19348248/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-12 16:20:43  来源:igfitidea点击:

Waiting on a list of Future

javamultithreadingfuture

提问by user93796

I have a method which returns a Listof futures

我有一个返回List期货的方法

List<Future<O>> futures = getFutures();

Now I want to wait until either all futures are done processing successfully or any of the tasks whose output is returned by a future throws an exception. Even if one task throws an exception, there is no point in waiting for the other futures.

现在我想等到所有期货都成功完成处理,或者其输出由期货返回的任何任务抛出异常。即使一个任务抛出异常,等待其他未来也没有意义。

Simple approach would be to

简单的方法是

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

But the problem here is if, for example, the 4th future throws an exception, then I will wait unnecessarily for the first 3 futures to be available.

但这里的问题是,例如,如果第 4 个期货抛出异常,那么我将不必要地等待前 3 个期货可用。

How to solve this? Will count down latch help in any way? I'm unable to use Future isDonebecause the java doc says

如何解决这个问题?会以任何方式倒计时闩锁帮助吗?我无法使用 FutureisDone因为 java 文档说

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

采纳答案by dcernahoschi

You can use a CompletionServiceto receive the futures as soon as they are ready and if one of them throws an exception cancel the processing. Something like this:

您可以使用CompletionService在期货准备好后立即接收期货,如果其中之一抛出异常,则取消处理。像这样的东西:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService = 
       new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
   completionService.submit(new Callable<SomeResult>() {
       public SomeResult call() {
           ...
           return result;
       }
   });
}

int received = 0;
boolean errors = false;

while(received < 4 && !errors) {
      Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
      try {
         SomeResult result = resultFuture.get();
         received ++;
         ... // do something with the result
      }
      catch(Exception e) {
             //log
         errors = true;
      }
}

I think you can further improve to cancel any still executing tasks if one of them throws an error.

我认为您可以进一步改进以取消任何仍在执行的任务,如果其中一个引发错误。

回答by jmiserez

You can use an ExecutorCompletionService. The documentation even has an example for your exact use-case:

您可以使用ExecutorCompletionService。该文档甚至为您的确切用例提供了一个示例:

Suppose instead that you would like to use the first non-null result of the set of tasks, ignoring any that encounter exceptions, and cancelling all other tasks when the first one is ready:

假设您想使用任务集的第一个非空结果,忽略任何遇到异常的结果,并在第一个任务准备好时取消所有其他任务:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
    Result result = null;
    try {
        for (Callable<Result> s : solvers)
            futures.add(ecs.submit(s));
        for (int i = 0; i < n; ++i) {
            try {
                Result r = ecs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {
            }
        }
    } finally {
        for (Future<Result> f : futures)
            f.cancel(true);
    }

    if (result != null)
        use(result);
}

The important thing to notice here is that ecs.take() will get the first completedtask, not just the first submitted one. Thus you should get them in the order of finishing the execution (or throwing an exception).

这里要注意的重要一点是 ecs.take() 将获得第一个完成的任务,而不仅仅是第一个提交的任务。因此,您应该按照完成执行(或抛出异常)的顺序获取它们。

回答by jmiserez

maybe this would help (nothing would replaced with raw thread, yeah!) I suggest run each Futureguy with a separated thread (they goes parallel), then when ever one of the got error, it just signal the manager(Handlerclass).

也许这会有所帮助(没有什么可以用原始线程代替,是的!)我建议Future用单独的线程运行每个人(它们并行运行),然后当任何一个出现错误时,它只会向经理(Handler类)发出信号。

class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
  thisThread=Thread.currentThread();
  List<Future<Object>> futures = getFutures();
  trds=new Thread[futures.size()];
  for (int i = 0; i < trds.length; i++) {
    RunTask rt=new RunTask(futures.get(i), this);
    trds[i]=new Thread(rt);
  }
  synchronized (this) {
    for(Thread tx:trds){
      tx.start();
    }  
  }
  for(Thread tx:trds){
    try {tx.join();
    } catch (InterruptedException e) {
      System.out.println("Job failed!");break;
    }
  }if(!failed){System.out.println("Job Done");}
}

private List<Future<Object>> getFutures() {
  return null;
}

public synchronized void cancelOther(){if(failed){return;}
  failed=true;
  for(Thread tx:trds){
    tx.stop();//Deprecated but works here like a boss
  }thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}

I have to say the above code would error(didn't check), but I hope I could explain the solution. please have a try.

我不得不说上面的代码会出错(没有检查),但我希望我能解释解决方案。请试一试。

回答by fl0w

The CompletionService will take your Callables with the .submit() method and you can retrieve the computed futures with the .take() method.

CompletionService 将使用 .submit() 方法获取您的 Callables,您可以使用 .take() 方法检索计算的期货。

One thing you must not forget is to terminate the ExecutorService by calling the .shutdown() method. Also you can only call this method when you have saved a reference to the executor service so make sure to keep one.

您不能忘记的一件事是通过调用 .shutdown() 方法来终止 ExecutorService。此外,您只能在保存对执行程序服务的引用时调用此方法,因此请确保保留一个。

Example code - For a fixed number of work items to be worked on in parallel:

示例代码 - 对于要并行处理的固定数量的工作项:

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

CompletionService<YourCallableImplementor> completionService = 
new ExecutorCompletionService<YourCallableImplementor>(service);

ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();

for (String computeMe : elementsToCompute) {
    futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;

while(received < elementsToCompute.size()) {
 Future<YourCallableImplementor> resultFuture = completionService.take(); 
 YourCallableImplementor result = resultFuture.get();
 received ++;
}
//important: shutdown your ExecutorService
service.shutdown();

Example code - For a dynamic number of work items to be worked on in parallel:

示例代码 - 对于要并行处理的动态数量的工作项:

public void runIt(){
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
    ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();

    //Initial workload is 8 threads
    for (int i = 0; i < 9; i++) {
        futures.add(completionService.submit(write.new CallableImplementor()));             
    }
    boolean finished = false;
    while (!finished) {
        try {
            Future<CallableImplementor> resultFuture;
            resultFuture = completionService.take();
            CallableImplementor result = resultFuture.get();
            finished = doSomethingWith(result.getResult());
            result.setResult(null);
            result = null;
            resultFuture = null;
            //After work package has been finished create new work package and add it to futures
            futures.add(completionService.submit(write.new CallableImplementor()));
        } catch (InterruptedException | ExecutionException e) {
            //handle interrupted and assert correct thread / work packet count              
        } 
    }

    //important: shutdown your ExecutorService
    service.shutdown();
}

public class CallableImplementor implements Callable{
    boolean result;

    @Override
    public CallableImplementor call() throws Exception {
        //business logic goes here
        return this;
    }

    public boolean getResult() {
        return result;
    }

    public void setResult(boolean result) {
        this.result = result;
    }
}

回答by Andrejs

If you are using Java 8then you can do this easier with CompletableFuture and CompletableFuture.allOf, which applies the callback only after all supplied CompletableFutures are done.

如果您使用的是Java 8,那么您可以使用 CompletableFuture 和CompletableFuture.allOf更轻松地做到这一点,它仅在所有提供的 CompletableFuture 完成后应用回调。

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(ignored -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}

回答by usr-local-ΕΨΗΕΛΩΝ

If you are using Java 8 and don't want to manipulate CompletableFutures, I have written a tool to retrieve results for a List<Future<T>>using streaming. The key is that you are forbidden to map(Future::get)as it throws.

如果您使用的是 Java 8 并且不想操作CompletableFutures,我已经编写了一个工具来检索List<Future<T>>using 流的结果。关键是你被禁止,map(Future::get)因为它抛出。

public final class Futures
{

    private Futures()
    {}

    public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
    {
        return new FutureCollector<>();
    }

    private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
    {
        private final List<Throwable> exceptions = new LinkedList<>();

        @Override
        public Supplier<Collection<T>> supplier()
        {
            return LinkedList::new;
        }

        @Override
        public BiConsumer<Collection<T>, Future<T>> accumulator()
        {
            return (r, f) -> {
                try
                {
                    r.add(f.get());
                }
                catch (InterruptedException e)
                {}
                catch (ExecutionException e)
                {
                    exceptions.add(e.getCause());
                }
            };
        }

        @Override
        public BinaryOperator<Collection<T>> combiner()
        {
            return (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            };
        }

        @Override
        public Function<Collection<T>, List<T>> finisher()
        {
            return l -> {

                List<T> ret = new ArrayList<>(l);
                if (!exceptions.isEmpty())
                    throw new AggregateException(exceptions, ret);

                return ret;
            };

        }

        @Override
        public Set<java.util.stream.Collector.Characteristics> characteristics()
        {
            return java.util.Collections.emptySet();
        }
    }

This needs an AggregateExceptionthat works like C#'s

这需要一个AggregateException像 C# 一样工作的

public class AggregateException extends RuntimeException
{
    /**
     *
     */
    private static final long serialVersionUID = -4477649337710077094L;

    private final List<Throwable> causes;
    private List<?> successfulElements;

    public AggregateException(List<Throwable> causes, List<?> l)
    {
        this.causes = causes;
        successfulElements = l;
    }

    public AggregateException(List<Throwable> causes)
    {
        this.causes = causes;
    }

    @Override
    public synchronized Throwable getCause()
    {
        return this;
    }

    public List<Throwable> getCauses()
    {
        return causes;
    }

    public List<?> getSuccessfulElements()
    {
        return successfulElements;
    }

    public void setSuccessfulElements(List<?> successfulElements)
    {
        this.successfulElements = successfulElements;
    }

}

This component acts exactly as C#'s Task.WaitAll. I am working on a variant that does the same as CompletableFuture.allOf(equivalento to Task.WhenAll)

该组件的作用与 C# 的Task.WaitAll 完全相同。我正在研究一种与CompletableFuture.allOf(相当于Task.WhenAll)相同的变体

The reason why I did this is that I am using Spring's ListenableFutureand don't want to port to CompletableFuturedespite it is a more standard way

我这样做的原因是我使用的是 SpringListenableFuture并且不想移植到CompletableFuture尽管它是一种更标准的方式

回答by sendon1982

Use a CompletableFuturein Java 8

CompletableFuture在 Java 8 中使用 a

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());

回答by Mohamed.Abdo

 /**
     * execute suppliers as future tasks then wait / join for getting results
     * @param functors a supplier(s) to execute
     * @return a list of results
     */
    private List getResultsInFuture(Supplier<?>... functors) {
        CompletableFuture[] futures = stream(functors)
                .map(CompletableFuture::supplyAsync)
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[functors.length]);
        CompletableFuture.allOf(futures).join();
        return stream(futures).map(a-> {
            try {
                return a.get();
            } catch (InterruptedException | ExecutionException e) {
                //logger.error("an error occurred during runtime execution a function",e);
                return null;
            }
        }).collect(Collectors.toList());
    };

回答by Brixomatic

I've got a utility class that contains these:

我有一个包含这些的实用程序类:

@FunctionalInterface
public interface CheckedSupplier<X> {
  X get() throws Throwable;
}

public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
    return () -> {
        try {
            return supplier.get();
        } catch (final Throwable checkedException) {
            throw new IllegalStateException(checkedException);
        }
    };
}

Once you have that, using a static import, you can simple wait for all futures like this:

一旦你有了它,使用静态导入,你可以简单地等待所有的期货,如下所示:

futures.stream().forEach(future -> uncheckedSupplier(future::get).get());

you can also collect all their results like this:

您还可以像这样收集他们的所有结果:

List<MyResultType> results = futures.stream()
    .map(future -> uncheckedSupplier(future::get).get())
    .collect(Collectors.toList());

Just revisiting my old post and noticing that you had another grief:

只是重温我的旧帖子并注意到您还有另一个悲伤:

But the problem here is if, for example, the 4th future throws an exception, then I will wait unnecessarily for the first 3 futures to be available.

但这里的问题是,例如,如果第 4 个期货抛出异常,那么我将不必要地等待前 3 个期货可用。

In this case, the simple solution is to do this in parallel:

在这种情况下,简单的解决方案是并行执行此操作:

futures.stream().parallel()
 .forEach(future -> uncheckedSupplier(future::get).get());

This way the first exception, although it will not stop the future, will break the forEach-statement, like in the serial example, but since all wait in parallel, you won't have to wait for the first 3 to complete.

这样第一个异常,虽然它不会停止未来,会破坏 forEach 语句,就像在串行示例中一样,但由于所有的等待都是并行的,你不必等待前 3 个完成。

回答by Bohao LI

In case that you want combine a List of CompletableFutures, you can do this :

如果您想组合 CompletableFutures 列表,您可以这样做:

List<CompletableFuture<Void>> futures = new ArrayList<>();
// ... Add futures to this ArrayList of CompletableFutures

// CompletableFuture.allOf() method demand a variadic arguments
// You can use this syntax to pass a List instead
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[futures.size()]));

// Wait for all individual CompletableFuture to complete
// All individual CompletableFutures are executed in parallel
allFutures.get();

For more details on Future & CompletableFuture, useful links:
1. Future: https://www.baeldung.com/java-future
2. CompletableFuture: https://www.baeldung.com/java-completablefuture
3. CompletableFuture: https://www.callicoder.com/java-8-completablefuture-tutorial/

有关 Future 和 CompletableFuture 的更多详细信息,有用的链接:
1. Future:https
: //www.baeldung.com/java-future 2. CompletableFuture:https
: //www.baeldung.com/java-completablefuture 3. CompletableFuture:https ://www.callicoder.com/java-8-completablefuture-tutorial/