java 如何中断 CompletableFuture 的底层执行
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29013831/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to interrupt underlying execution of CompletableFuture
提问by vach
I know that CompletableFuture
design does not control its execution with interruptions, but I suppose some of you might have this problem. CompletableFuture
s are very good way to compose async execution, but given the case when you want the underlying execution to be interrupted or stopped when future is canceled, how do we do that? Or we must just accept that any canceled or manually completed CompletableFuture
will not impact the thread working out there to complete it?
我知道CompletableFuture
设计不会通过中断来控制其执行,但我想你们中的一些人可能会遇到这个问题。CompletableFuture
s 是组合异步执行的非常好的方法,但是考虑到您希望在取消 future 时中断或停止底层执行的情况,我们该怎么做?或者我们必须接受任何取消或手动完成CompletableFuture
都不会影响在那里工作以完成它的线程?
That is, in my opinion, obviously a useless work that takes time of executor worker. I wonder what approach or design might help in this case?
也就是说,在我看来,这显然是一项耗费 executor worker 时间的无用工作。我想知道在这种情况下什么方法或设计可能会有所帮助?
UPDATE
更新
Here is a simple test for this
这是一个简单的测试
public class SimpleTest {
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<Void> cf = CompletableFuture.runAsync(()->longOperation());
bearSleep(1);
//cf.cancel(true);
cf.complete(null);
System.out.println("it should die now already");
bearSleep(7);
}
public static void longOperation(){
System.out.println("started");
bearSleep(5);
System.out.println("completed");
}
private static void bearSleep(long seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
System.out.println("OMG!!! Interrupt!!!");
}
}
}
采纳答案by Sotirios Delimanolis
A CompletableFuture
is not related to the asynchronous action that may eventually complete it.
ACompletableFuture
与可能最终完成它的异步操作无关。
Since (unlike
FutureTask
) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Methodcancel
has the same effect ascompleteExceptionally(new CancellationException())
.
由于(与
FutureTask
)此类无法直接控制导致其完成的计算,因此取消仅被视为另一种形式的异常完成。方法cancel
与completeExceptionally(new CancellationException())
.
There may not even bea separate thread working on completing it (there may even be manythreads working on it). Even if there is, there's no link from a CompletableFuture
to any thread that has a reference to it.
有可能甚至是一个单独的线程上完成它的工作(甚至有可能是许多线程在它的工作)。即使有,也没有从 aCompletableFuture
到任何引用它的线程的链接。
As such, there's nothing you can do through CompletableFuture
to interrupt any thread that may be running some task that will complete it. You'll have to write your own logic which tracks any Thread
instances which acquire a reference to the CompletableFuture
with the intention to complete it.
因此,您无法CompletableFuture
中断任何可能正在运行的任务来完成它的线程。您必须编写自己的逻辑来跟踪任何Thread
获取对 的引用CompletableFuture
并打算完成它的实例。
Here's an example of the type of execution I think you could get away with.
这是我认为您可以逃脱的执行类型的示例。
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(1);
CompletableFuture<String> completable = new CompletableFuture<>();
Future<?> future = service.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
if (Thread.interrupted()) {
return; // remains uncompleted
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return; // remains uncompleted
}
}
completable.complete("done");
}
});
Thread.sleep(2000);
// not atomic across the two
boolean cancelled = future.cancel(true);
if (cancelled)
completable.cancel(true); // may not have been cancelled if execution has already completed
if (completable.isCancelled()) {
System.out.println("cancelled");
} else if (completable.isCompletedExceptionally()) {
System.out.println("exception");
} else {
System.out.println("success");
}
service.shutdown();
}
This assumes that the task being executed is setup to handle interruptions correctly.
这假设正在执行的任务设置为正确处理中断。
回答by Ruben
What about this?
那这个呢?
public static <T> CompletableFuture<T> supplyAsync(final Supplier<T> supplier) {
final ExecutorService executorService = Executors.newFixedThreadPool(1);
final CompletableFuture<T> cf = new CompletableFuture<T>() {
@Override
public boolean complete(T value) {
if (isDone()) {
return false;
}
executorService.shutdownNow();
return super.complete(value);
}
@Override
public boolean completeExceptionally(Throwable ex) {
if (isDone()) {
return false;
}
executorService.shutdownNow();
return super.completeExceptionally(ex);
}
};
// submit task
executorService.submit(() -> {
try {
cf.complete(supplier.get());
} catch (Throwable ex) {
cf.completeExceptionally(ex);
}
});
return cf;
}
Simple Test:
简单测试:
CompletableFuture<String> cf = supplyAsync(() -> {
try {
Thread.sleep(1000L);
} catch (Exception e) {
System.out.println("got interrupted");
return "got interrupted";
}
System.out.println("normal complete");
return "normal complete";
});
cf.complete("manual complete");
System.out.println(cf.get());
I don't like the idea of having to create an Executor service every time, but maybe you can find a way to reuse the ForkJoinPool.
我不喜欢每次都必须创建一个 Executor 服务的想法,但也许您可以找到一种方法来重用 ForkJoinPool。
回答by Valery Silaev
Please see my answer to related question: Transform Java Future into a CompletableFuture
请参阅我对相关问题的回答: Transform Java Future into a CompletableFuture
In the code mentioned there, the CompletionStage behavior is added to RunnableFuture subclass (used by ExecutorService implementations), so you may interrupt it in the right way.
在那里提到的代码中,CompletionStage 行为被添加到 RunnableFuture 子类(由 ExecutorService 实现使用),因此您可以以正确的方式中断它。
回答by user6519354
What about?
关于什么?
/** @return {@link CompletableFuture} which when cancelled will interrupt the supplier
*/
public static <T> CompletableFuture<T> supplyAsyncInterruptibly(Supplier<T> supplier, Executor executor) {
return produceInterruptibleCompletableFuture((s) -> CompletableFuture.supplyAsync(s, executor), supplier);
}
// in case we want to do the same for similar methods later
private static <T> CompletableFuture<T> produceInterruptibleCompletableFuture(
Function<Supplier<T>,CompletableFuture<T>> completableFutureAsyncSupplier, Supplier<T> action) {
FutureTask<T> task = new FutureTask<>(action::get);
return addCancellationAction(completableFutureAsyncSupplier.apply(asSupplier(task)), () ->
task.cancel(true));
}
/** Ensures the specified action is executed if the given {@link CompletableFuture} is cancelled.
*/
public static <T> CompletableFuture<T> addCancellationAction(CompletableFuture<T> completableFuture,
@NonNull Runnable onCancellationAction) {
completableFuture.whenComplete((result, throwable) -> {
if (completableFuture.isCancelled()) {
onCancellationAction.run();
}
});
return completableFuture; // return original CompletableFuture
}
/** @return {@link Supplier} wrapper for the given {@link RunnableFuture} which calls {@link RunnableFuture#run()}
* followed by {@link RunnableFuture#get()}.
*/
public static <T> Supplier<T> asSupplier(RunnableFuture<T> futureTask) throws CompletionException {
return () -> {
try {
futureTask.run();
try {
return futureTask.get();
} catch (ExecutionException e) { // unwrap ExecutionExceptions
final Throwable cause = e.getCause();
throw (cause != null) ? cause : e;
}
} catch (CompletionException e) {
throw e;
} catch (Throwable t) {
throw new CompletionException(t);
}
};
}
回答by jontejj
If you use
如果你使用
cf.get();
instead of
代替
cf.join();
The thread waiting on the completion can be interrupted. This bit me in the a**, so I'm just putting it out there. You'd then need to propagate this interruption further / use cf.cancel(...) to really finish the execution.
等待完成的线程可以被中断。这让我很不舒服,所以我只是把它放在那里。然后你需要进一步传播这个中断/使用 cf.cancel(...) 来真正完成执行。