java 使用 CompletableFuture 重试逻辑

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/40485398/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 05:15:41  来源:igfitidea点击:

Retry logic with CompletableFuture

javaexceptionasynchronousconcurrencyjava-8

提问by xmas79

I need to submit a task in an async framework I'm working on, but I need to catch for exceptions, and retry the same task multiple times before "aborting".

我需要在我正在处理的异步框架中提交一个任务,但我需要捕捉异常,并在“中止”之前多次重试同一任务。

The code I'm working with is:

我正在使用的代码是:

int retries = 0;
public CompletableFuture<Result> executeActionAsync() {

    // Execute the action async and get the future
    CompletableFuture<Result> f = executeMycustomActionHere();

    // If the future completes with exception:
    f.exceptionally(ex -> {
        retries++; // Increment the retry count
        if (retries < MAX_RETRIES)
            return executeActionAsync();  // <--- Submit one more time

        // Abort with a null value
        return null;
    });

    // Return the future    
    return f;
}

This currently doesn't compile because the return type of the lambda is wrong: it expects a Result, but the executeActionAsyncreturns a CompletableFuture<Result>.

这当前无法编译,因为 lambda 的返回类型是错误的:它期望 a Result,但executeActionAsync返回 a CompletableFuture<Result>

How can I implement this fully async retry logic?

如何实现这种完全异步的重试逻辑?

采纳答案by xmas79

I think I was successfully. Here's an example class I created and the test code:

我想我成功了。这是我创建的示例类和测试代码:



RetriableTask.java

可重试任务

public class RetriableTask
{
    protected static final int MAX_RETRIES = 10;
    protected int retries = 0;
    protected int n = 0;
    protected CompletableFuture<Integer> future = new CompletableFuture<Integer>();

    public RetriableTask(int number) {
        n = number;
    }

    public CompletableFuture<Integer> executeAsync() {
        // Create a failure within variable timeout
        Duration timeoutInMilliseconds = Duration.ofMillis(1*(int)Math.pow(2, retries));
        CompletableFuture<Integer> timeoutFuture = Utils.failAfter(timeoutInMilliseconds);

        // Create a dummy future and complete only if (n > 5 && retries > 5) so we can test for both completion and timeouts. 
        // In real application this should be a real future
        final CompletableFuture<Integer> taskFuture = new CompletableFuture<>();
        if (n > 5 && retries > 5)
            taskFuture.complete(retries * n);

        // Attach the failure future to the task future, and perform a check on completion
        taskFuture.applyToEither(timeoutFuture, Function.identity())
            .whenCompleteAsync((result, exception) -> {
                if (exception == null) {
                    future.complete(result);
                } else {
                    retries++;
                    if (retries >= MAX_RETRIES) {
                        future.completeExceptionally(exception);
                    } else {
                        executeAsync();
                    }
                }
            });

        // Return the future    
        return future;
    }
}


Usage

用法

int size = 10;
System.out.println("generating...");
List<RetriableTask> tasks = new ArrayList<>();
for (int i = 0; i < size; i++) {
    tasks.add(new RetriableTask(i));
}

System.out.println("issuing...");
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < size; i++) {
    futures.add(tasks.get(i).executeAsync());
}

System.out.println("Waiting...");
for (int i = 0; i < size; i++) {
    try {
        CompletableFuture<Integer> future = futures.get(i);
        int result = future.get();
        System.out.println(i + " result is " + result);
    } catch (Exception ex) {
        System.out.println(i + " I got exception!");
    }
}
System.out.println("Done waiting...");


Output

输出

generating...
issuing...
Waiting...
0 I got exception!
1 I got exception!
2 I got exception!
3 I got exception!
4 I got exception!
5 I got exception!
6 result is 36
7 result is 42
8 result is 48
9 result is 54
Done waiting...


Main idea and some glue code (failAfterfunction) come from here.

主要思想和一些胶水代码(failAfter函数)来自这里

Any other suggestions or improvement are welcome.

欢迎任何其他建议或改进。

回答by Holger

Chaining subsequent retries can be straight-forward:

链接后续重试可以很简单:

public CompletableFuture<Result> executeActionAsync() {
    CompletableFuture<Result> f=executeMycustomActionHere();
    for(int i=0; i<MAX_RETRIES; i++) {
        f=f.exceptionally(t -> executeMycustomActionHere().join());
    }
    return f;
}

Read about the drawbacks below
This simply chains as many retries as intended, as these subsequent stages won't do anything in the non-exceptional case.

阅读下面的缺点
这只是按照预期链接尽可能多的重试,因为这些后续阶段在非异常情况下不会做任何事情。

One drawback is that if the first attempt fails immediately, so that fis already completed exceptionally when the first exceptionallyhandler is chained, the action will be invoked by the calling thread, removing the asynchronous nature of the request entirely. And generally, join()may block a thread (the default executor will start a new compensation thread then, but still, it's discouraged). Unfortunately, there is neither, an exceptionallyAsyncor an exceptionallyComposemethod.

一个缺点是,如果第一次尝试立即失败,因此fexceptionally链接第一个处理程序时已经异常完成,则调用线程将调用该操作,从而完全消除请求的异步性质。通常,join()可能会阻塞一个线程(默认执行器将启动一个新的补偿线程,但仍然不鼓励这样做)。不幸的是,没有,一个exceptionallyAsyncexceptionallyCompose方法。

A solution not invoking join()would be

不调用的解决方案join()

public CompletableFuture<Result> executeActionAsync() {
    CompletableFuture<Result> f=executeMycustomActionHere();
    for(int i=0; i<MAX_RETRIES; i++) {
        f=f.thenApply(CompletableFuture::completedFuture)
           .exceptionally(t -> executeMycustomActionHere())
           .thenCompose(Function.identity());
    }
    return f;
}

demonstrating how involved combining “compose” and an “exceptionally” handler is.

演示如何结合“组合”和“异常”处理程序。

Further, only the last exception will be reported, if all retries failed. A better solution should report the first exception, with subsequent exceptions of the retries added as suppressed exceptions. Such a solution can be build by chaining a recursive call, as hinted by Gili's answer, however, in order to use this idea for exception handling, we have to use the steps to combine “compose” and “exceptionally” shown above:

此外,如果所有重试都失败,则只会报告最后一个异常。更好的解决方案应该报告第一个异常,并将重试的后续异常添加为抑制异常。这样的解决方案可以通过链接递归调用来构建,正如Gili 的回答所暗示的那样,但是,为了使用这个想法进行异常处理,我们必须使用上面显示的组合“组合”和“异常”的步骤:

public CompletableFuture<Result> executeActionAsync() {
    return executeMycustomActionHere()
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(t -> retry(t, 0))
        .thenCompose(Function.identity());
}
private CompletableFuture<Result> retry(Throwable first, int retry) {
    if(retry >= MAX_RETRIES) return CompletableFuture.failedFuture(first);
    return executeMycustomActionHere()
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(t -> { first.addSuppressed(t); return retry(first, retry+1); })
        .thenCompose(Function.identity());
}

CompletableFuture.failedFutureis a Java?9 method, but it would be trivial to add a Java?8 compatible backport to your code if needed:

CompletableFuture.failedFuture是一个 Java?9 方法,但如果需要,将 Java?8 兼容的向后移植添加到您的代码中将是微不足道的:

public static <T> CompletableFuture<T> failedFuture(Throwable t) {
    final CompletableFuture<T> cf = new CompletableFuture<>();
    cf.completeExceptionally(t);
    return cf;
}

回答by Alex Fargus

I recently solved a similar problem using the guava-retryinglibrary.

我最近使用guava-retrying库解决了一个类似的问题。

Callable<Result> callable = new Callable<Result>() {
    public Result call() throws Exception {
        return executeMycustomActionHere();
    }
};

Retryer<Boolean> retryer = RetryerBuilder.<Result>newBuilder()
        .retryIfResult(Predicates.<Result>isNull())
        .retryIfExceptionOfType(IOException.class)
        .retryIfRuntimeException()
        .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRIES))
        .build();

CompletableFuture.supplyAsync( () -> {
    try {
        retryer.call(callable);
    } catch (RetryException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
       e.printStackTrace();
    }
});

回答by theazureshadow

Instead of implementing your own retry logic, I recommend using a proven library like failsafe, which has built-in support for futures (and seems more popular than guava-retrying). For your example, it would look something like:

我建议不要使用自己的重试逻辑,而是使用经过验证的库,例如failsafe,它内置了对期货的支持(并且似乎比guava-retrying更受欢迎)。对于您的示例,它看起来像:

private static RetryPolicy retryPolicy = new RetryPolicy()
    .withMaxRetries(MAX_RETRIES);

public CompletableFuture<Result> executeActionAsync() {
    return Failsafe.with(retryPolicy)
        .with(executor)
        .withFallback(null)
        .future(this::executeMycustomActionHere);
}

Probably you should avoid .withFallback(null)and just have let the returned future's .get()method throw the resulting exception so the caller of your method can handle it specifically, but that's a design decision you'll have to make.

也许您应该避免.withFallback(null)并让返回的未来.get()方法抛出结果异常,以便您的方法的调用者可以专门处理它,但这是您必须做出的设计决定。

Other things to think about include whether you should retry immediately or wait some period of time between attempts, any sort of recursive backoff (useful when you're calling a web service that might be down), and whether there are specific exceptions that aren't worth retrying (e.g. if the parameters to the method are invalid).

其他需要考虑的事情包括是否应该立即重试或在两次尝试之间等待一段时间、任何类型的递归退避(当您调用可能关闭的 Web 服务时很有用),以及是否存在特定的异常值得重试(例如,如果方法的参数无效)。

回答by u10645061

util class:

工具类:

public class RetryUtil {

    public static <R> CompletableFuture<R> retry(Supplier<CompletableFuture<R>> supplier, int maxRetries) {
        CompletableFuture<R> f = supplier.get();
        for(int i=0; i<maxRetries; i++) {
            f=f.thenApply(CompletableFuture::completedFuture)
                .exceptionally(t -> {
                    System.out.println("retry for: "+t.getMessage());
                    return supplier.get();
                })
                .thenCompose(Function.identity());
        }
        return f;
    }
}

usage:

用法:

public CompletableFuture<String> lucky(){
    return CompletableFuture.supplyAsync(()->{
        double luckNum = Math.random();
        double luckEnough = 0.6;
        if(luckNum < luckEnough){
            throw new RuntimeException("not luck enough: " + luckNum);
        }
        return "I'm lucky: "+luckNum;
    });
}
@Test
public void testRetry(){
    CompletableFuture<String> retry = RetryUtil.retry(this::lucky, 10);
    System.out.println("async check");
    String join = retry.join();
    System.out.println("lucky? "+join);
}

output

输出

async check
retry for: java.lang.RuntimeException: not luck enough: 0.412296354211683
retry for: java.lang.RuntimeException: not luck enough: 0.4099777199676573
lucky? I'm lucky: 0.8059089479049389

回答by Gili

Here is an approach that will work for any CompletionStagesubclass and does not return a dummy CompletableFuturethat does nothing more than wait to get updated by other futures.

这是一种适用于任何子CompletionStage类的方法,并且不会返回一个只会CompletableFuture等待其他期货更新的哑元。

/**
 * Sends a request that may run as many times as necessary.
 *
 * @param request  a supplier initiates an HTTP request
 * @param executor the Executor used to run the request
 * @return the server response
 */
public CompletionStage<Response> asyncRequest(Supplier<CompletionStage<Response>> request, Executor executor)
{
    return retry(request, executor, 0);
}

/**
 * Sends a request that may run as many times as necessary.
 *
 * @param request  a supplier initiates an HTTP request
 * @param executor the Executor used to run the request
 * @param tries    the number of times the operation has been retried
 * @return the server response
 */
private CompletionStage<Response> retry(Supplier<CompletionStage<Response>> request, Executor executor, int tries)
{
    if (tries >= MAX_RETRIES)
        throw new CompletionException(new IOException("Request failed after " + MAX_RETRIES + " tries"));
    return request.get().thenComposeAsync(response ->
    {
        if (response.getStatusInfo().getFamily() != Response.Status.Family.SUCCESSFUL)
            return retry(request, executor, tries + 1);
        return CompletableFuture.completedFuture(response);
    }, executor);
}