java 使用 CompletableFuture 重试逻辑
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/40485398/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Retry logic with CompletableFuture
提问by xmas79
I need to submit a task in an async framework I'm working on, but I need to catch for exceptions, and retry the same task multiple times before "aborting".
我需要在我正在处理的异步框架中提交一个任务,但我需要捕捉异常,并在“中止”之前多次重试同一任务。
The code I'm working with is:
我正在使用的代码是:
int retries = 0;
public CompletableFuture<Result> executeActionAsync() {
// Execute the action async and get the future
CompletableFuture<Result> f = executeMycustomActionHere();
// If the future completes with exception:
f.exceptionally(ex -> {
retries++; // Increment the retry count
if (retries < MAX_RETRIES)
return executeActionAsync(); // <--- Submit one more time
// Abort with a null value
return null;
});
// Return the future
return f;
}
This currently doesn't compile because the return type of the lambda is wrong: it expects a Result
, but the executeActionAsync
returns a CompletableFuture<Result>
.
这当前无法编译,因为 lambda 的返回类型是错误的:它期望 a Result
,但executeActionAsync
返回 a CompletableFuture<Result>
。
How can I implement this fully async retry logic?
如何实现这种完全异步的重试逻辑?
采纳答案by xmas79
I think I was successfully. Here's an example class I created and the test code:
我想我成功了。这是我创建的示例类和测试代码:
RetriableTask.java
可重试任务
public class RetriableTask
{
protected static final int MAX_RETRIES = 10;
protected int retries = 0;
protected int n = 0;
protected CompletableFuture<Integer> future = new CompletableFuture<Integer>();
public RetriableTask(int number) {
n = number;
}
public CompletableFuture<Integer> executeAsync() {
// Create a failure within variable timeout
Duration timeoutInMilliseconds = Duration.ofMillis(1*(int)Math.pow(2, retries));
CompletableFuture<Integer> timeoutFuture = Utils.failAfter(timeoutInMilliseconds);
// Create a dummy future and complete only if (n > 5 && retries > 5) so we can test for both completion and timeouts.
// In real application this should be a real future
final CompletableFuture<Integer> taskFuture = new CompletableFuture<>();
if (n > 5 && retries > 5)
taskFuture.complete(retries * n);
// Attach the failure future to the task future, and perform a check on completion
taskFuture.applyToEither(timeoutFuture, Function.identity())
.whenCompleteAsync((result, exception) -> {
if (exception == null) {
future.complete(result);
} else {
retries++;
if (retries >= MAX_RETRIES) {
future.completeExceptionally(exception);
} else {
executeAsync();
}
}
});
// Return the future
return future;
}
}
Usage
用法
int size = 10;
System.out.println("generating...");
List<RetriableTask> tasks = new ArrayList<>();
for (int i = 0; i < size; i++) {
tasks.add(new RetriableTask(i));
}
System.out.println("issuing...");
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < size; i++) {
futures.add(tasks.get(i).executeAsync());
}
System.out.println("Waiting...");
for (int i = 0; i < size; i++) {
try {
CompletableFuture<Integer> future = futures.get(i);
int result = future.get();
System.out.println(i + " result is " + result);
} catch (Exception ex) {
System.out.println(i + " I got exception!");
}
}
System.out.println("Done waiting...");
Output
输出
generating...
issuing...
Waiting...
0 I got exception!
1 I got exception!
2 I got exception!
3 I got exception!
4 I got exception!
5 I got exception!
6 result is 36
7 result is 42
8 result is 48
9 result is 54
Done waiting...
Main idea and some glue code (failAfter
function) come from here.
主要思想和一些胶水代码(failAfter
函数)来自这里。
Any other suggestions or improvement are welcome.
欢迎任何其他建议或改进。
回答by Holger
Chaining subsequent retries can be straight-forward:
链接后续重试可以很简单:
public CompletableFuture<Result> executeActionAsync() {
CompletableFuture<Result> f=executeMycustomActionHere();
for(int i=0; i<MAX_RETRIES; i++) {
f=f.exceptionally(t -> executeMycustomActionHere().join());
}
return f;
}
Read about the drawbacks below
This simply chains as many retries as intended, as these subsequent stages won't do anything in the non-exceptional case.
阅读下面的缺点
这只是按照预期链接尽可能多的重试,因为这些后续阶段在非异常情况下不会做任何事情。
One drawback is that if the first attempt fails immediately, so that f
is already completed exceptionally when the first exceptionally
handler is chained, the action will be invoked by the calling thread, removing the asynchronous nature of the request entirely. And generally, join()
may block a thread (the default executor will start a new compensation thread then, but still, it's discouraged). Unfortunately, there is neither, an exceptionallyAsync
or an exceptionallyCompose
method.
一个缺点是,如果第一次尝试立即失败,因此f
在exceptionally
链接第一个处理程序时已经异常完成,则调用线程将调用该操作,从而完全消除请求的异步性质。通常,join()
可能会阻塞一个线程(默认执行器将启动一个新的补偿线程,但仍然不鼓励这样做)。不幸的是,没有,一个exceptionallyAsync
或exceptionallyCompose
方法。
A solution not invoking join()
would be
不调用的解决方案join()
是
public CompletableFuture<Result> executeActionAsync() {
CompletableFuture<Result> f=executeMycustomActionHere();
for(int i=0; i<MAX_RETRIES; i++) {
f=f.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> executeMycustomActionHere())
.thenCompose(Function.identity());
}
return f;
}
demonstrating how involved combining “compose” and an “exceptionally” handler is.
演示如何结合“组合”和“异常”处理程序。
Further, only the last exception will be reported, if all retries failed. A better solution should report the first exception, with subsequent exceptions of the retries added as suppressed exceptions. Such a solution can be build by chaining a recursive call, as hinted by Gili's answer, however, in order to use this idea for exception handling, we have to use the steps to combine “compose” and “exceptionally” shown above:
此外,如果所有重试都失败,则只会报告最后一个异常。更好的解决方案应该报告第一个异常,并将重试的后续异常添加为抑制异常。这样的解决方案可以通过链接递归调用来构建,正如Gili 的回答所暗示的那样,但是,为了使用这个想法进行异常处理,我们必须使用上面显示的组合“组合”和“异常”的步骤:
public CompletableFuture<Result> executeActionAsync() {
return executeMycustomActionHere()
.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> retry(t, 0))
.thenCompose(Function.identity());
}
private CompletableFuture<Result> retry(Throwable first, int retry) {
if(retry >= MAX_RETRIES) return CompletableFuture.failedFuture(first);
return executeMycustomActionHere()
.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> { first.addSuppressed(t); return retry(first, retry+1); })
.thenCompose(Function.identity());
}
CompletableFuture.failedFuture
is a Java?9 method, but it would be trivial to add a Java?8 compatible backport to your code if needed:
CompletableFuture.failedFuture
是一个 Java?9 方法,但如果需要,将 Java?8 兼容的向后移植添加到您的代码中将是微不足道的:
public static <T> CompletableFuture<T> failedFuture(Throwable t) {
final CompletableFuture<T> cf = new CompletableFuture<>();
cf.completeExceptionally(t);
return cf;
}
回答by Alex Fargus
I recently solved a similar problem using the guava-retryinglibrary.
我最近使用guava-retrying库解决了一个类似的问题。
Callable<Result> callable = new Callable<Result>() {
public Result call() throws Exception {
return executeMycustomActionHere();
}
};
Retryer<Boolean> retryer = RetryerBuilder.<Result>newBuilder()
.retryIfResult(Predicates.<Result>isNull())
.retryIfExceptionOfType(IOException.class)
.retryIfRuntimeException()
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRIES))
.build();
CompletableFuture.supplyAsync( () -> {
try {
retryer.call(callable);
} catch (RetryException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
回答by theazureshadow
Instead of implementing your own retry logic, I recommend using a proven library like failsafe, which has built-in support for futures (and seems more popular than guava-retrying). For your example, it would look something like:
我建议不要使用自己的重试逻辑,而是使用经过验证的库,例如failsafe,它内置了对期货的支持(并且似乎比guava-retrying更受欢迎)。对于您的示例,它看起来像:
private static RetryPolicy retryPolicy = new RetryPolicy()
.withMaxRetries(MAX_RETRIES);
public CompletableFuture<Result> executeActionAsync() {
return Failsafe.with(retryPolicy)
.with(executor)
.withFallback(null)
.future(this::executeMycustomActionHere);
}
Probably you should avoid .withFallback(null)
and just have let the returned future's .get()
method throw the resulting exception so the caller of your method can handle it specifically, but that's a design decision you'll have to make.
也许您应该避免.withFallback(null)
并让返回的未来.get()
方法抛出结果异常,以便您的方法的调用者可以专门处理它,但这是您必须做出的设计决定。
Other things to think about include whether you should retry immediately or wait some period of time between attempts, any sort of recursive backoff (useful when you're calling a web service that might be down), and whether there are specific exceptions that aren't worth retrying (e.g. if the parameters to the method are invalid).
其他需要考虑的事情包括是否应该立即重试或在两次尝试之间等待一段时间、任何类型的递归退避(当您调用可能关闭的 Web 服务时很有用),以及是否存在特定的异常值得重试(例如,如果方法的参数无效)。
回答by u10645061
util class:
工具类:
public class RetryUtil {
public static <R> CompletableFuture<R> retry(Supplier<CompletableFuture<R>> supplier, int maxRetries) {
CompletableFuture<R> f = supplier.get();
for(int i=0; i<maxRetries; i++) {
f=f.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> {
System.out.println("retry for: "+t.getMessage());
return supplier.get();
})
.thenCompose(Function.identity());
}
return f;
}
}
usage:
用法:
public CompletableFuture<String> lucky(){
return CompletableFuture.supplyAsync(()->{
double luckNum = Math.random();
double luckEnough = 0.6;
if(luckNum < luckEnough){
throw new RuntimeException("not luck enough: " + luckNum);
}
return "I'm lucky: "+luckNum;
});
}
@Test
public void testRetry(){
CompletableFuture<String> retry = RetryUtil.retry(this::lucky, 10);
System.out.println("async check");
String join = retry.join();
System.out.println("lucky? "+join);
}
output
输出
async check
retry for: java.lang.RuntimeException: not luck enough: 0.412296354211683
retry for: java.lang.RuntimeException: not luck enough: 0.4099777199676573
lucky? I'm lucky: 0.8059089479049389
回答by Gili
Here is an approach that will work for any CompletionStage
subclass and does not return a dummy CompletableFuture
that does nothing more than wait to get updated by other futures.
这是一种适用于任何子CompletionStage
类的方法,并且不会返回一个只会CompletableFuture
等待其他期货更新的哑元。
/**
* Sends a request that may run as many times as necessary.
*
* @param request a supplier initiates an HTTP request
* @param executor the Executor used to run the request
* @return the server response
*/
public CompletionStage<Response> asyncRequest(Supplier<CompletionStage<Response>> request, Executor executor)
{
return retry(request, executor, 0);
}
/**
* Sends a request that may run as many times as necessary.
*
* @param request a supplier initiates an HTTP request
* @param executor the Executor used to run the request
* @param tries the number of times the operation has been retried
* @return the server response
*/
private CompletionStage<Response> retry(Supplier<CompletionStage<Response>> request, Executor executor, int tries)
{
if (tries >= MAX_RETRIES)
throw new CompletionException(new IOException("Request failed after " + MAX_RETRIES + " tries"));
return request.get().thenComposeAsync(response ->
{
if (response.getStatusInfo().getFamily() != Response.Status.Family.SUCCESSFUL)
return retry(request, executor, tries + 1);
return CompletableFuture.completedFuture(response);
}, executor);
}