Java 8 CompletableFuture 中的默认值超时

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23575067/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-14 00:04:29  来源:igfitidea点击:

Timeout with default value in Java 8 CompletableFuture

javajava-8

提问by jonderry

Suppose I have some async computation, such as:

假设我有一些异步计算,例如:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .thenAccept(foo -> doStuffWithFoo(foo));

Is there a nice way to provide a default value for foo if the async supplier times out according to some specified timeout? Ideally, such functionality would attempt to cancel the slow-running supplier as well. For example, is there standard library functionality that is similar to the following hypothetical code:

如果异步供应商根据某个指定的超时超时,是否有一种很好的方法可以为 foo 提供默认值?理想情况下,此类功能也会尝试取消运行缓慢的供应商。例如,是否存在类似于以下假设代码的标准库功能:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .acceptEither(
                CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
                foo -> doStuffWithFoo(foo));

Or perhaps even better:

或者甚至更好:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
        .thenAccept(foo -> doStuffWithFoo(foo));

I know about get(timeout, unit), but am wondering if there's a nicer standard way of applying a timeout in an asynchronous and reactive fashion as suggested in the code above.

我知道get(timeout, unit),但我想知道是否有更好的标准方法以异步和反应方式应用超时,如上面代码中所建议的那样。

EDIT: Here's a solution that's inspired by Java 8: Mandatory checked exceptions handling in lambda expressions. Why mandatory, not optional?, but unfortunately it blocks a thread. If we rely on createFoo() to asynchronously check for timeout and throw its own timeout exception it would work without blocking a thread, but would place more burden on the creator of the supplier and would still have the cost of creating an exception (which can be expensive without "fast throw")

编辑:这是一个受Java 8启发的解决方案: lambda 表达式中的强制检查异常处理。为什么是强制性的,而不是可选的?,但不幸的是它阻塞了一个线程。如果我们依赖 createFoo() 来异步检查超时并抛出自己的超时异常,它会在不阻塞线程的情况下工作,但会给供应商的创建者带来更多负担,并且仍然会产生创建异常的成本(这可以没有“快速投掷”就很贵)

static <T> Supplier<T> wrapped(Callable<T> callable) {
    return () -> {
        try {
            return callable.call();
        } catch (RuntimeException e1) {
            throw e1;
        } catch (Throwable e2) {
            throw new RuntimeException(e2);
        }
    };
}
CompletableFuture
        .supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
        .exceptionally(e -> "default")
        .thenAcceptAsync(s -> doStuffWithFoo(foo));

回答by Dane White

I think you'll always need an extra thread monitoring when its time to supply the default value. I'd probably go the route of having two supplyAsync calls, with the default wrapped in a utility API, linked by an acceptEither. If you'd rather wrap your Supplier, then you could use a utility API that makes the 'either' call for you:

我认为在提供默认值时,您总是需要额外的线程监控。我可能会走两个 supplyAsync 调用的路线,默认值包含在一个实用程序 API 中,由一个 acceptEither 链接。如果您更愿意包装您的供应商,那么您可以使用一个实用程序 API 来为您进行“任一”调用:

public class TimeoutDefault {
    public static <T> CompletableFuture<T> with(T t, int ms) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(ms);
            } catch (InterruptedException e) { }
            return t;
        });
    }

    public static <T> Supplier<T> with(Supplier<T> supplier, T t, int ms) {
        return () -> CompletableFuture.supplyAsync(supplier)
            .applyToEither(TimeoutDefault.with(t, ms), i -> i).join();
    }
}

CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(Example::createFoo)
        .acceptEither(
            TimeoutDefault.with("default", 1000),
            Example::doStuffWithFoo);

CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(TimeoutDefault.with(Example::createFoo, "default", 1000))
        .thenAccept(Example::doStuffWithFoo);

回答by Ruben

CompletableFuture.supplyAsync is just a helper method that creates a CompletableFuture for you, and submits the task to the ForkJoin Pool.

CompletableFuture.supplyAsync 只是一个辅助方法,它为你创建一个 CompletableFuture,并将任务提交到 ForkJoin Pool。

You can create your own supplyAsync with your requirements like this:

您可以根据您的要求创建自己的 supplyAsync,如下所示:

private static final ScheduledExecutorService schedulerExecutor = 
                                 Executors.newScheduledThreadPool(10);
private static final ExecutorService executorService = 
                                 Executors.newCachedThreadPool();


public static <T> CompletableFuture<T> supplyAsync(
        final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
        T defaultValue) {

    final CompletableFuture<T> cf = new CompletableFuture<T>();

    // as pointed out by Peti, the ForkJoinPool.commonPool() delivers a 
    // ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
    // Using Executors.newCachedThreadPool instead in the example
    // submit task
    Future<?> future = executorService.submit(() -> {
        try {
            cf.complete(supplier.get());
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
        }
    });

    //schedule watcher
    schedulerExecutor.schedule(() -> {
        if (!cf.isDone()) {
            cf.complete(defaultValue);
            future.cancel(true);
        }

    }, timeoutValue, timeUnit);

    return cf;
}

Creating the CompletableFuture with that helper is as easy as using the static method in CompletableFuture:

使用该助手创建 CompletableFuture 就像使用 CompletableFuture 中的静态方法一样简单:

    CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
            TimeUnit.SECONDS, "default");

To test it:

要测试它:

    a = supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e1) {
            // ignore
        }
        return "hi";
    }, 1, TimeUnit.SECONDS, "default");

回答by Aaron Digulla

DZone has a good article how to solve this: https://dzone.com/articles/asynchronous-timeouts

DZone 有一篇很好的文章如何解决这个问题:https: //dzone.com/articles/asynchronous-timeouts

I'm not sure about the copyright of the code, hence I can't copy it here. The solution is very much like the one from Dane White but it uses a thread pool with a single thread plus schedule()to avoid wasting a thread just to wait for the timeout.

我不确定代码的版权,因此我不能在这里复制它。该解决方案与 Dane White 的解决方案非常相似,但它使用带有单个线程的线程池加上schedule()以避免浪费线程只是为了等待超时。

It also throws a TimeoutExceptioninstead of returning a default.

它还抛出一个TimeoutException而不是返回默认值。

回答by tclamb

There's no standard library method for constructing a CompletableFuture supplied with a value after a timeout. That said, it's really simple to roll your own with minimal resource overhead:

没有标准的库方法来构造在超时后提供一个值的 CompletableFuture。也就是说,以最少的资源开销推出自己的产品非常简单:

private static final ScheduledExecutorService EXECUTOR
        = Executors.newSingleThreadScheduledExecutor();

public static <T> CompletableFuture<T> delayedValue(final T value,
                                                    final Duration delay) {
    final CompletableFuture<T> result = new CompletableFuture<>();
    EXECUTOR.schedule(() -> result.complete(value),
                      delay.toMillis(), TimeUnit.MILLISECONDS);
    return result;
}

It can be used with the "either" methods of CompleteableFuture:

它可以与以下的“ either”方法一起使用CompleteableFuture

  • accceptEither, acceptEitherAsync
  • applyToEither, applyToEitherAsync
  • runAfterEither, runAfterEitherAsync
  • accceptEither, acceptEitherAsync
  • applyToEither, applyToEitherAsync
  • runAfterEither, runAfterEitherAsync

One application is using a cached value if a remote service call exceeds some latency threshold:

如果远程服务调用超过某个延迟阈值,则一个应用程序正在使用缓存值:

interface RemoteServiceClient {
    CompletableFuture<Foo> getFoo();
}

final RemoteServiceClient client = /* ... */;
final Foo cachedFoo = /* ... */;
final Duration timeout = /* ... */;

client.getFoos()
    .exceptionally(ignoredException -> cachedFoo)
    .acceptEither(delayedValue(cachedFoo, timeout),
        foo -> /* do something with foo */)
    .join();

In case the remote client call completes exceptionally (e.g. SocketTimeoutException), we can fail fast and use the cached value immediately.

如果远程客户端调用异常完成(例如SocketTimeoutException),我们可以快速失败并立即使用缓存值。

CompletableFuture.anyOf(CompletableFuture<?>...)can be combined with this delayedValueprimitive to wrap a CompletableFuturewith the above semantics:

CompletableFuture.anyOf(CompletableFuture<?>...)可以与此delayedValue原语结合使用CompletableFuture上述语义包装 a :

@SuppressWarnings("unchecked")
public static <T> CompletableFuture<T> withDefault(final CompletableFuture<T> cf,
                                                   final T defaultValue,
                                                   final Duration timeout) {
    return (CompletableFuture<T>) CompletableFuture.anyOf(
        cf.exceptionally(ignoredException -> defaultValue),
        delayedValue(defaultValue, timeout));
}

This nicely simplifies the remote service call example above:

这很好地简化了上面的远程服务调用示例:

withDefault(client.getFoos(), cachedFoo, timeout)
    .thenAccept(foo -> /* do something with foo */)
    .join();

CompletableFutures are more accurately termed promises, as they decouple creation of the Futurefrom its completion. Be sure to use dedicated thread pools for heavy CPU work. To create a CompletableFuturefor an expensive computation, you should use the CompletableFuture#supplyAsync(Supplier, Executor)overload, as the #supplyAsync(Supplier)overload defaults to the common ForkJoinPool. The returned CompletableFuturecould not cancel its task, as this functionality isn't exposed by the Executorinterface. More generally, dependent CompletableFutures don't cancel their parents, e.g. cf.thenApply(f).cancel(true)does not cancel cf. I'd recommend sticking to the Futures returned by ExecutorServices if you need that functionality.

CompletableFutures 更准确地称为承诺,因为它们将 的创建Future与其完成分离。请务必使用专用线程池进行繁重的 CPU 工作。要CompletableFuture为昂贵的计算创建,您应该使用CompletableFuture#supplyAsync(Supplier, Executor)重载,因为#supplyAsync(Supplier)重载默认为 common ForkJoinPool。返回的CompletableFuture无法取消其任务,因为此功能不是由Executor接口公开的。更一般地,家属CompletableFuture不会取消他们的父母,例如cf.thenApply(f).cancel(true)不会取消cf。如果您需要该功能,我建议您坚持使用Futures 返回的ExecutorServices 。

回答by user140547

In Java 9, there will be completeOnTimeout(T value, long timeout, TimeUnit unit), which does what you want, although it does not cancel the slow supplier.

在 Java 9 中,将有completeOnTimeout(T value, long timeout, TimeUnit unit),它可以满足您的需求,尽管它不会取消慢速供应商。

There is also a orTimeout(long timeout, TimeUnit unit), which completes exceptionally in case on a timeout.

还有一个orTimeout(long timeout, TimeUnit unit),它在超时的情况下异常完成。