Java 超时后中断任务的ExecutorService

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/2758612/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 12:31:28  来源:igfitidea点击:

ExecutorService that interrupts tasks after a timeout

javamultithreadingconcurrencyexecutorservice

提问by Edward Dale

I'm looking for an ExecutorServiceimplementation that can be provided with a timeout. Tasks that are submitted to the ExecutorService are interrupted if they take longer than the timeout to run. Implementing such a beast isn't such a difficult task, but I'm wondering if anybody knows of an existing implementation.

我正在寻找可以提供超时的ExecutorService实现。提交给 ExecutorService 的任务如果运行时间超过超时时间,则会被中断。实现这样的野兽并不是一项艰巨的任务,但我想知道是否有人知道现有的实现。

Here's what I came up with based on some of the discussion below. Any comments?

这是我根据下面的一些讨论得出的结论。任何意见?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}

采纳答案by John Vint

You can use a ScheduledExecutorServicefor this. First you would submit it only once to begin immediately and retain the future that is created. After that you can submit a new task that would cancel the retained future after some period of time.

您可以为此使用ScheduledExecutorService。首先,您只需提交一次即可立即开始并保留创建的未来。之后,您可以提交一个新任务,该任务将在一段时间后取消保留的未来。

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
 final Future handler = executor.submit(new Callable(){ ... });
 executor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

This will execute your handler (main functionality to be interrupted) for 10 seconds, then will cancel (i.e. interrupt) that specific task.

这将执行您的处理程序(要中断的主要功能)10 秒钟,然后将取消(即中断)该特定任务。

回答by ZZ Coder

Wrap the task in FutureTask and you can specify timeout for the FutureTask. Look at the example in my answer to this question,

将任务包装在 FutureTask 中,您可以为 FutureTask 指定超时。看看我对这个问题的回答中的例子,

java native Process timeout

java本机进程超时

回答by Flavio

Unfortunately the solution is flawed. There is a sort of bug with ScheduledThreadPoolExecutor, also reported in this question: cancelling a submitted task does not fully release the memory resources associated with the task; the resources are released only when the task expires.

不幸的是,该解决方案是有缺陷的。有一种错误ScheduledThreadPoolExecutor,也在这个问题中报告:取消提交的任务并没有完全释放与任务相关的内存资源;只有当任务到期时才会释放资源。

If you therefore create a TimeoutThreadPoolExecutorwith a fairly long expiration time (a typical usage), and submit tasks fast enough, you end up filling the memory - even though the tasks actually completed successfully.

因此,如果您创建一个TimeoutThreadPoolExecutor具有相当长的到期时间(典型用法)并足够快地提交任务,您最终会填满内存 - 即使任务实际上已成功完成。

You can see the problem with the following (very crude) test program:

您可以通过以下(非常粗略的)测试程序看到问题:

public static void main(String[] args) throws InterruptedException {
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
    //ExecutorService service = Executors.newFixedThreadPool(1);
    try {
        final AtomicInteger counter = new AtomicInteger();
        for (long i = 0; i < 10000000; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    counter.incrementAndGet();
                }
            });
            if (i % 10000 == 0) {
                System.out.println(i + "/" + counter.get());
                while (i > counter.get()) {
                    Thread.sleep(10);
                }
            }
        }
    } finally {
        service.shutdown();
    }
}

The program exhausts the available memory, although it waits for the spawned Runnables to complete.

该程序耗尽了可用内存,尽管它等待生成的Runnables 完成。

I though about this for a while, but unfortunately I could not come up with a good solution.

我想了一会儿,但不幸的是我无法想出一个好的解决方案。

EDIT: I found out this issue was reported as JDK bug 6602600, and appears to have been fixed very recently.

编辑:我发现这个问题被报告为JDK bug 6602600,并且最近似乎已经修复。

回答by Giovanni Botta

How about using the ExecutorService.shutDownNow()method as described in http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html? It seems to be the simplest solution.

如何使用http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html 中ExecutorService.shutDownNow()描述的方法?这似乎是最简单的解决方案。

回答by Sergey

It seems problem is not in JDK bug 6602600 ( it was solved at 2010-05-22), but in incorrect call of sleep(10) in circle. Addition note, that the main Thread must give directly CHANCE to other threads to realize thier tasks by invoke SLEEP(0) in EVERY branch of outer circle. It is better, I think, to use Thread.yield() instead of Thread.sleep(0)

似乎问题不在 JDK 错误 6602600(已在 2010-05-22 解决)中,而是在循环中错误地调用 sleep(10) 中。另外请注意,主线程必须通过在外圈的每个分支中调用 SLEEP(0) 来直接给其他线程机会以实现它们的任务。我认为最好使用 Thread.yield() 而不是 Thread.sleep(0)

The result corrected part of previous problem code is such like this:

结果修正了之前问题代码的部分是这样的:

.......................
........................
Thread.yield();         

if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}

//                
//                while (i > counter.get()) {
//                    Thread.sleep(10);
//                } 

It works correctly with amount of outer counter up to 150 000 000 tested circles.

它可以在外部计数器数量高达 150 000 000 个测试圈的情况下正常工作。

回答by Ionut Mesaros

What about this alternative idea :

这个替代想法怎么样:

  • two have two executors :
    • one for :
      • submitting the task, without caring about the timeout of the task
      • adding the Future resulted and the time when it should end to an internal structure
    • one for executing an internal job which is checking the internal structure if some tasks are timeout and if they have to be cancelled.
  • 两个有两个执行者:
    • 一个用于:
      • 提交任务,不关心任务超时
      • 添加 Future 结果以及它应该在内部结构中结束的时间
    • 一种用于执行内部作业,该作业正在检查内部结构是否有些任务超时以及是否必须取消。

Small sample is here :

小样本在这里:

public class AlternativeExecutorService 
{

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue       = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor                scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService                   threadExecutor    = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;

public AlternativeExecutorService()
{
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
}

public void pushTask(OwnTask task)
{
    ListenableFuture<Void> future = threadExecutor.submit(task);  // -> create your Callable
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
}

public void shutdownInternalScheduledExecutor()
{
    scheduledFuture.cancel(true);
    scheduledExecutor.shutdownNow();
}

long getCurrentMillisecondsTime()
{
    return Calendar.getInstance().get(Calendar.MILLISECOND);
}

class ListenableFutureTask
{
    private final ListenableFuture<Void> future;
    private final OwnTask                task;
    private final long                   milliSecEndTime;

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
    {
        this.future = future;
        this.task = task;
        this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
    }

    ListenableFuture<Void> getFuture()
    {
        return future;
    }

    OwnTask getTask()
    {
        return task;
    }

    long getMilliSecEndTime()
    {
        return milliSecEndTime;
    }
}

class TimeoutManagerJob implements Runnable
{
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
    {
        return futureQueue;
    }

    @Override
    public void run()
    {
        long currentMileSecValue = getCurrentMillisecondsTime();
        for (ListenableFutureTask futureTask : futureQueue)
        {
            consumeFuture(futureTask, currentMileSecValue);
        }
    }

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
    {
        ListenableFuture<Void> future = futureTask.getFuture();
        boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
        if (isTimeout)
        {
            if (!future.isDone())
            {
                future.cancel(true);
            }
            futureQueue.remove(futureTask);
        }
    }
}

class OwnTask implements Callable<Void>
{
    private long     timeoutDuration;
    private TimeUnit timeUnit;

    OwnTask(long timeoutDuration, TimeUnit timeUnit)
    {
        this.timeoutDuration = timeoutDuration;
        this.timeUnit = timeUnit;
    }

    @Override
    public Void call() throws Exception
    {
        // do logic
        return null;
    }

    public long getTimeoutDuration()
    {
        return timeoutDuration;
    }

    public TimeUnit getTimeUnit()
    {
        return timeUnit;
    }
}
}

回答by Johnny

After ton of time to survey,
Finally, I use invokeAllmethod of ExecutorServiceto solve this problem.
That will strictly interrupt the task while task running.
Here is example

经过大量时间调查,
最后,我使用 的invokeAll方法ExecutorService来解决这个问题。
这将在任务运行时严格中断任务。
这是例子

ExecutorService executorService = Executors.newCachedThreadPool();

try {
    List<Callable<Object>> callables = new ArrayList<>();
    // Add your long time task (callable)
    callables.add(new VaryLongTimeTask());
    // Assign tasks for specific execution timeout (e.g. 2 sec)
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
    for (Future<Object> future : futures) {
        // Getting result
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

executorService.shutdown();


The pro is you can also submit ListenableFutureat the same ExecutorService.
Just slightly change the first line of code.

亲是也可以提交ListenableFuture同一ExecutorService
只需稍微更改第一行代码。

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorServiceis the Listening feature of ExecutorServiceat google guava project (com.google.guava) )

ListeningExecutorServiceExecutorService谷歌番石榴项目(com.google.guava)的听力功能)

回答by amanteaux

Using John W answer I created an implementation that correctly begin the timeout when the task starts its execution. I even write a unit test for it :)

使用 John W answer 我创建了一个实现,该实现在任务开始执行时正确开始超时。我什至为它写了一个单元测试:)

However, it does not suit my needs since some IO operations do not interrupt when Future.cancel()is called (ie when Thread.interrupt()is called). Some examples of IO operation that may not be interrupted when Thread.interrupt()is called are Socket.connectand Socket.read(and I suspect most of IO operation implemented in java.io). All IO operations in java.nioshould be interruptible when Thread.interrupt()is called. For example, that is the case for SocketChannel.openand SocketChannel.read.

但是,它不适合我的需要,因为某些 IO 操作在Future.cancel()被调用时(即何时Thread.interrupt()被调用)不会中断。一些在Thread.interrupt()被调用时可能不会被中断的 IO 操作的例子是Socket.connectand Socket.read(我怀疑大部分 IO 操作是在 中实现的java.io)。所有 IO 操作在被调用java.nioThread.interrupt()都应该是可中断的。例如,对于壳体SocketChannel.openSocketChannel.read

Anyway if anyone is interested, I created a gist for a thread pool executor that allows tasks to timeout (if they are using interruptible operations...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf

无论如何,如果有人感兴趣,我为允许任务超时的线程池执行程序创建了一个要点(如果他们使用可中断的操作......):https: //gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf

回答by Nitish Kumar

check if this works for you,

检查这是否适合你,

    public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor,
      int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection,
      Map<K,V> context, Task<T,S,K,V> someTask){
    if(threadPoolExecutor==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build();
    }
    if(someTask==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build();
    }
    if(CollectionUtils.isEmpty(collection)){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build();
    }

    LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size());
    collection.forEach(value -> {
      callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything.
    });
    LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>();

    int count = 0;

    while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){
      Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll());
      futures.offer(f);
      count++;
    }

    Collection<ResponseObject<T>> responseCollection = new ArrayList<>();

    while(futures.size()>0){
      Future<T> future = futures.poll();
      ResponseObject<T> responseObject = null;
        try {
          T response = future.get(timeToCompleteEachTask, timeUnit);
          responseObject = ResponseObject.<T>builder().data(response).build();
        } catch (InterruptedException e) {
          future.cancel(true);
        } catch (ExecutionException e) {
          future.cancel(true);
        } catch (TimeoutException e) {
          future.cancel(true);
        } finally {
          if (Objects.nonNull(responseObject)) {
            responseCollection.add(responseObject);
          }
          futures.remove(future);//remove this
          Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue);
          if(null!=callable){
            Future<T> f = threadPoolExecutor.submit(callable);
            futures.add(f);
          }
        }

    }
    return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build();
  }

  private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){
    if(callableLinkedBlockingQueue.size()>0){
      return callableLinkedBlockingQueue.poll();
    }
    return null;
  }

you can restrict the no of thread uses from scheduler as well as put timeout on the task.

您可以限制调度程序中的线程使用次数,并在任务上设置超时。