Java executors:当任务完成时如何在不阻塞的情况下收到通知?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/826212/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 19:56:10  来源:igfitidea点击:

Java executors: how to be notified, without blocking, when a task completes?

javacallbacknotifyexecutor

提问by Shahbaz

Say I have a queue full of tasks which I need to submit to an executor service. I want them processed one at a time. The simplest way I can think of is to:

假设我有一个充满任务的队列,我需要将这些任务提交给执行程序服务。我希望他们一次处理一个。我能想到的最简单的方法是:

  1. Take a task from the queue
  2. Submit it to the executor
  3. Call .get on the returned Future and block until a result is available
  4. Take another task from the queue...
  1. 从队列中取出一个任务
  2. 提交给执行者
  3. 在返回的 Future 上调用 .get 并阻塞直到结果可用
  4. 从队列中取出另一个任务...

However, I am trying to avoid blocking completely. If I have 10,000 such queues, which need their tasks processed one at a time, I'll run out of stack space because most of them will be holding on to blocked threads.

但是,我试图避免完全阻塞。如果我有 10,000 个这样的队列,它们需要一次处理一个任务,我将耗尽堆栈空间,因为它们中的大多数将保持阻塞线程。

What I would like is to submit a task and provide a call-back which is called when the task is complete. I'll use that call-back notification as a flag to send the next task. (functionaljava and jetlang apparently use such non-blocking algorithms, but I can't understand their code)

我想要的是提交一个任务并提供一个回调,该回调在任务完成时被调用。我将使用该回调通知作为发送下一个任务的标志。(functionaljava 和 jetlang 显然使用了这种非阻塞算法,但我无法理解他们的代码)

How can I do that using JDK's java.util.concurrent, short of writing my own executor service?

我如何使用 JDK 的 java.util.concurrent 来做到这一点,而不是编写我自己的执行程序服务?

(the queue which feeds me these tasks may itself block, but that is an issue to be tackled later)

(为我提供这些任务的队列本身可能会阻塞,但这是稍后要解决的问题)

采纳答案by erickson

Define a callback interface to receive whatever parameters you want to pass along in the completion notification. Then invoke it at the end of the task.

定义一个回调接口来接收你想在完成通知中传递的任何参数。然后在任务结束时调用它。

You could even write a general wrapper for Runnable tasks, and submit these to ExecutorService. Or, see below for a mechanism built into Java 8.

您甚至可以为 Runnable 任务编写一个通用包装器,并将它们提交到ExecutorService. 或者,请参阅下面的 Java 8 内置机制。

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}


With CompletableFuture, Java 8 included a more elaborate means to compose pipelines where processes can be completed asynchronously and conditionally. Here's a contrived but complete example of notification.

有了CompletableFuture,Java 8 包含了一种更复杂的方法来组合管道,其中进程可以异步和有条件地完成。这是一个人为但完整的通知示例。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

回答by Yuval Adam

Use a CountDownLatch.

使用一个CountDownLatch.

It's from java.util.concurrentand it's exactly the way to wait for several threads to complete execution before continuing.

它来自java.util.concurrent并且正是在继续之前等待多个线程完成执行的方式。

In order to achieve the callback effect you're looking after, that does require a little additional extra work. Namely, handling this by yourself in a separate thread which uses the CountDownLatchand does wait on it, then goes on about notifying whatever it is you need to notify. There is no native support for callback, or anything similar to that effect.

为了实现您正在关注的回调效果,这确实需要一些额外的额外工作。也就是说,在一个单独的线程中自己处理这个,该线程使用CountDownLatch并等待它,然后继续通知你需要通知的任何内容。没有对回调或任何类似效果的原生支持。



EDIT:now that I further understand your question, I think you are reaching too far, unnecessarily. If you take a regular SingleThreadExecutor, give it all the tasks, and it will do the queueing natively.

编辑:现在我进一步理解你的问题,我认为你不必要地太过分了。如果你使用一个普通的SingleThreadExecutor,给它所有的任务,它会在本地进行排队。

回答by basszero

If you want to make sure that no tasks will run at the same time then use a SingleThreadedExecutor. The tasks will be processed in the order the are submitted. You don't even need to hold the tasks, just submit them to the exec.

如果您想确保不会同时运行任何任务,请使用SingleThreadedExecutor。任务将按照提交的顺序进行处理。您甚至不需要持有任务,只需将它们提交给 exec。

回答by Cem Catikkas

ThreadPoolExecutoralso has beforeExecuteand afterExecutehook methods that you can override and make use of. Here is the description from ThreadPoolExecutor's Javadocs.

ThreadPoolExecutor也有beforeExecuteafterExecute钩子方法,您可以覆盖和利用。这里是从描述ThreadPoolExecutorJavadoc中

Hook methods

This class provides protected overridable beforeExecute(java.lang.Thread, java.lang.Runnable)and afterExecute(java.lang.Runnable, java.lang.Throwable)methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated()can be overridden to perform any special processing that needs to be done once the Executorhas fully terminated. If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.

钩子方法

这个类提供保护的重写beforeExecute(java.lang.Thread, java.lang.Runnable)afterExecute(java.lang.Runnable, java.lang.Throwable)之前和每个任务的执行之后被调用的方法。这些可用于操作执行环境;例如,重新初始化ThreadLocals、收集统计信息或添加日志条目。此外,terminated()可以重写方法以执行在Executor完全终止后需要完成的任何特殊处理。如果钩子或回调方法抛出异常,内部工作线程可能会失败并突然终止。

回答by Auguste

You could extend FutureTaskclass, and override the done()method, then add the FutureTaskobject to the ExecutorService, so the done()method will invoke when the FutureTaskcompleted immediately.

您可以扩展FutureTask类,并覆盖该done()方法,然后将FutureTask对象添加到ExecutorService,因此该done()方法将在FutureTask完成时立即调用。

回答by Pierre-Henri

Use Guava's listenable future APIand add a callback. Cf. from the website :

使用Guava 的 listenable future API并添加回调。参见 从网站:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

回答by Matt

In Java 8 you can use CompletableFuture. Here's an example I had in my code where I'm using it to fetch users from my user service, map them to my view objects and then update my view or show an error dialog (this is a GUI application):

在 Java 8 中,您可以使用CompletableFuture。这是我在代码中的一个示例,我使用它从我的用户服务中获取用户,将它们映射到我的视图对象,然后更新我的视图或显示错误对话框(这是一个 GUI 应用程序):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

It executes asynchronously. I'm using two private methods: mapUsersToUserViewsand updateView.

它异步执行。我正在使用两种私有方法:mapUsersToUserViewsupdateView.

回答by Old Hyman

Just to add to Matt's answer, which helped, here is a more fleshed-out example to show the use of a callback.

只是为了补充 Matt 的回答,这有帮助,这里有一个更充实的例子来展示回调的使用。

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

The output is:

输出是:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito

回答by Deepika Anand

You may use a implementation of Callable such that

您可以使用 Callable 的实现,这样

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

where CallbackInterface is something very basic like

其中 CallbackInterface 是非常基本的东西

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

and now the main class will look like this

现在主类看起来像这样

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);

回答by bcorso

This is an extension to Pache's answer using Guava's ListenableFuture.

这是使用番石榴的ListenableFuture.

In particular, Futures.transform()returns ListenableFutureso can be used to chain async calls. Futures.addCallback()returns void, so cannot be used for chaining, but is good for handling success/failure on an async completion.

特别是,Futures.transform()返回ListenableFuture因此可用于链接异步调用。Futures.addCallback()Returns void,因此不能用于链接,但有利于处理异步完成时的成功/失败。

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

NOTE:In addition to chaining async tasks, Futures.transform()also allows you to schedule each task on a separate executor (Not shown in this example).

注意:除了链接异步任务之外,Futures.transform()还允许您在单独的执行程序上安排每个任务(本示例中未显示)。