如何取消 Java 8 可完成的未来?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23320407/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 21:52:07  来源:igfitidea点击:

How to cancel Java 8 completable future?

javamultithreadingjava-8completable-future

提问by Lukas

I am playing with Java 8 completable futures. I have the following code:

我正在玩 Java 8 可完成的期货。我有以下代码:

CountDownLatch waitLatch = new CountDownLatch(1);

CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
    try {
        System.out.println("Wait");
        waitLatch.await(); //cancel should interrupt
        System.out.println("Done");
    } catch (InterruptedException e) {
        System.out.println("Interrupted");
        throw new RuntimeException(e);
    }
});

sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");

assertTrue(future.isCancelled());

assertTrue(future.isDone());
sleep(100); //give it some time to finish

Using runAsync I schedule execution of a code that waits on a latch. Next I cancel the future, expecting an interrupted exception to be thrown inside. But it seems that the thread remains blocked on the await call and the InterruptedException is never thrown even though the future is canceled (assertions pass). An equivalent code using ExecutorService works as expected. Is it a bug in the CompletableFuture or in my example?

使用 runAsync 我安排了一个等待闩锁的代码的执行。接下来我取消未来,期望在里面抛出一个被中断的异常。但似乎线程在 await 调用上仍然被阻塞,并且即使未来被取消(断言通过)也永远不会抛出 InterruptedException 。使用 ExecutorService 的等效代码按预期工作。它是 CompletableFuture 中的错误还是我的示例中的错误?

采纳答案by nosid

Apparently, it's intentional. The Javadoc for the method CompletableFuture::cancelstates:

显然,这是故意的。CompletableFuture::cancel方法的 Javadoc指出:

[Parameters:]mayInterruptIfRunning - this value has noeffect in this implementation because interrupts are not used to control processing.

[参数:]mayInterruptIfRunning -这个值没有在该实施方式的效果,因为中断将不被用于控制处理。

Interestingly, the method ForkJoinTask::canceluses almost the same wording for the parameter mayInterruptIfRunning.

有趣的是,方法ForkJoinTask::cancel对参数mayInterruptIfRunning使用几乎相同的措辞。

I have a guess on this issue:

我对这个问题有一个猜测:

  • interruptionis intended to be used with blocking operations, like sleep, waitor I/O operations,
  • but neither CompletableFuturenor ForkJoinTaskare intended to be used with blocking operations.
  • 中断旨在用于阻塞操作,如睡眠等待或 I/O 操作,
  • 但是CompletableFutureForkJoinTask都不打算与阻塞操作一起使用。

Instead of blocking, a CompletableFutureshould create a new CompletionStage, and cpu-bound tasks are a prerequisite for the fork-join model. So, using interruptionwith either of them would defeat their purpose. And on the other hand, it might increase complexity, that's not required if used as intended.

CompletableFuture应该创建一个新的CompletionStage而不是阻塞,并且 cpu 绑定任务是 fork-join 模型的先决条件。因此,对他们中的任何一个使用中断都会破坏他们的目的。另一方面,它可能会增加复杂性,如果按预期使用则不需要。

回答by edharned

The CancellationException is part of the internal ForkJoin cancel routine. The exception will come out when you retrieve the result of future:

CancellationException 是内部 ForkJoin 取消例程的一部分。检索future的结果时会出现异常:

try { future.get(); }
      catch (Exception e){
          System.out.println(e.toString());            
      }

Took a while to see this in a debugger. The JavaDoc is not that clear on what is happening or what you should expect.

花了一段时间才在调试器中看到这一点。JavaDoc 并不清楚正在发生的事情或您应该期待什么。

回答by Kirill Gamazkov

When you call CompletableFuture#cancel, you only stop the downstream part of the chain. Upstream part, i. e. something that will eventually call complete(...)or completeExceptionally(...), doesn't get any signal that the result is no more needed.

当您调用 时CompletableFuture#cancel,您只会停止链的下游部分。上游部分,即最终会调用complete(...)or 的东西,completeExceptionally(...)不会得到任何不再需要结果的信号。

What are those 'upstream' and 'downstream' things?

那些“上游”和“下游”的东西是什么?

Let's consider the following code:

让我们考虑以下代码:

CompletableFuture
        .supplyAsync(() -> "hello")               //1
        .thenApply(s -> s + " world!")            //2
        .thenAccept(s -> System.out.println(s));  //3

Here, the data flows from top to bottom - from being created by supplier, through being modified by function, to being consumed by println. The part above particular step is called upstream, and the part below is downstream. E. g. steps 1 and 2 are upstream for step 3.

在这里,数据从上到下流动——从由供应商创建,到被功能修改,再到被 消费println。特定步骤上面的部分称为上游,下面的部分称为下游。例如 步骤 1 和 2 是步骤 3 的上游。

Here's what happens behind the scenes. This is not precise, rather it's a convenient mind model of what's going on.

这是幕后发生的事情。这并不精确,而是一种方便的思维模型。

  1. Supplier (step 1) is being executed (inside the JVM's common ForkJoinPool).
  2. The result of the supplier is then being passed by complete(...)to the next CompletableFuturedownstream.
  3. Upon receiving the result, that CompletableFutureinvokes next step - a function (step 2) which takes in previous step result and returns something that will be passed further, to the downstream CompletableFuture's complete(...).
  4. Upon receiving the step 2 result, step 3 CompletableFutureinvokes the consumer, System.out.println(s). After consumer is finished, the downstream CompletableFuturewill receive it's value, (Void) null
  1. 供应商(步骤 1)正在执行(在 JVM 的 common 内ForkJoinPool)。
  2. 然后将供应商的结果传递complete(...)给下一个CompletableFuture下游。
  3. 收到结果后,它会CompletableFuture调用下一步 - 一个函数(步骤 2),它接受上一步的结果并返回一些将进一步传递给下游CompletableFuturecomplete(...).
  4. 收到第 2 步的结果后,第 3 步调CompletableFuture用消费者System.out.println(s)。消费者完成后,下游CompletableFuture将收到它的价值,(Void) null

As we can see, each CompletableFuturein this chain has to know who are there downstream waiting for the value to be passed to their's complete(...)(or completeExceptionally(...)). But the CompletableFuturedon't have to know anything about it's upstream (or upstreams - there might be several).

正如我们所看到的,CompletableFuture这个链中的每个人都必须知道谁在下游等待将值传递给他们的complete(...)(或completeExceptionally(...))。但是CompletableFuture不必知道它的上游(或上游 - 可能有几个)。

Thus, calling cancel()upon step 3 doesn't abort steps 1 and 2, because there's no link from step 3 to step 2.

因此,调用cancel()第 3 步不会中止第 1 步和第 2 步,因为从第 3 步到第 2 步没有链接。

It is supposed that if you're using CompletableFuturethen your steps are small enough so that there's no harm if a couple of extra steps will get executed.

假设您正在使用,CompletableFuture那么您的步骤足够小,因此即使执行几个额外的步骤也不会造成任何伤害。

If you want cancellation to be propagated upstream, you have two options:

如果您希望取消在上游传播,您有两种选择:

  • Implement this yourself - create a dedicated CompletableFuture(name it like cancelled) which is checked after every step (something like step.applyToEither(cancelled, Function.identity()))
  • Use reactive stack like RxJava 2, ProjectReactor/Flux or Akka Streams
  • 自己实现这个 - 创建一个专用的CompletableFuture(命名为cancelled),在每一步后检查(类似于step.applyToEither(cancelled, Function.identity())
  • 使用响应式堆栈,如 RxJava 2、ProjectReactor/Flux 或 Akka Streams

回答by Valery Silaev

You need an alternative implementation of CompletionStage to accomplish true thread interruption. I've just released a small library that serves exactly this purpose - https://github.com/vsilaev/tascalate-concurrent

您需要 CompletionStage 的替代实现来完成真正的线程中断。我刚刚发布了一个专门用于此目的的小型库 - https://github.com/vsilaev/tascalate-concurrent