如何不压倒 java executorservice 任务队列?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/11568821/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-31 05:37:11  来源:igfitidea点击:

how not to overwhelm java executorservice task queue?

javamultithreadingexecutorservice

提问by user1539050

I have the below code snippet, which runs fine. But the problem is it creates and put over 2000 tasks on the executor queue right off the bat.

我有下面的代码片段,运行良好。但问题是它立即创建并在执行器队列中放置了 2000 多个任务。

I need to have a check if the tasks already in the executor queue are complete, only then give it more tasks. It doesnt have to be exact, ie if the queue has <10 tasks left, add 50 more.

我需要检查执行器队列中已有的任务是否完成,然后再给它更多的任务。它不必是精确的,即如果队列剩余 <10 个任务,则再添加 50 个。

So the executor task queue doesnt have so many pending tasks, which will also allow shutdown() to work in a timely manner, otherwise even if called, the executor will still trying to complete all 2000 tasks in its queue first.

所以 executor 任务队列中没有那么多待处理的任务,这也可以让 shutdown() 及时工作,否则即使被调用,executor 仍然会尝试先完成其队列中的所有 2000 个任务。

What is the best way to accomplish this? thank you

实现这一目标的最佳方法是什么?谢谢

executor = Executors.newFixedThreadPool(numThreads);

while(some_condition==true)
{
    //if(executor < 10 tasks pending)  <---- how do i do this?
    //{                             
        for(int k=0;k<20;k++)
        {  
            Runnable worker = new MyRunnable();
            executor.execute(worker);
        }
    //}
    //else 
    //{
    //      wait(3000);
    //}
} 


Update using semaphore:

使用信号量更新:

private final Semaphore semaphore = new Semaphore(10)
executor = new ThreadPoolExecutorWithSemaphoreFromJohnExample();

while(some_condition==true)
{

        Runnable worker = new MyRunnable();
        //So at this point if semaphore is full, then while loop would PAUSE(??) until
        //semaphore frees up again.
          executor.execute(worker);   
} 

回答by Gray

I have the below code snippet, which runs fine. But the problem is it creates and put over 2000 tasks on the executor queue right off the bat.

我有下面的代码片段,运行良好。但问题是它立即创建并在执行器队列中放置了 2000 多个任务。

One way to do this is to create your own ThreadPoolExecutorwith a limited job queue and set a custom RejectedExecutionHandleron it. This allows you to have fine grained control over how many jobs to queue.

一种方法是ThreadPoolExecutor使用有限的作业队列创建您自己的队列并RejectedExecutionHandler在其上设置自定义。这使您可以对要排队的作业数量进行细粒度控制。

You need the custom handler because by default, if the queue is full the ThreadPoolExecutor.submit(...)will throw a RejectedExecutionException. With the custom handler below, when it gets rejected by the queue, the rejection handler just puts it back in, blocking until the queue has space. So no jobs will be rejected/dropped.

您需要自定义处理程序,因为默认情况下,如果队列已满,ThreadPoolExecutor.submit(...)则会抛出RejectedExecutionException. 使用下面的自定义处理程序,当它被队列拒绝时,拒绝处理程序会将其放回原处,阻塞直到队列有空间。所以不会拒绝/放弃任何工作。

Here's approximately how you start your own thread-pool and set your own reject handler.

这是您启动自己的线程池并设置自己的拒绝处理程序的大致方法。

// you can tune the blocking queue size which is the number of jobs to queue
// when the NUM_THREADS are all working
final BlockingQueue<MyRunnable> queue =
    new ArrayBlockingQueue<MyRunnable>(NUM_JOBS_TO_QUEUE);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the job that fills the queue, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full as opposed to throwing
      executor.getQueue().put(r);
   }
});
...
// now submit all of your jobs and it will block if the queue is full
for(int k = 0; k < 20000000; k++) {  
   Runnable worker = new MyRunnable();
   threadPool.execute(worker);
}

See my answer here for more details about blocking thread-pools:

有关阻塞线程池的更多详细信息,请参阅我的回答:

How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

如果需要处理的数据太多,如何让 ThreadPoolExecutor 命令等待?

You can also use the ThreadPoolExecutor.CallerRunsPolicywhich would cause the caller that is submitting the job into the thread-pool to execute the job. I don't like this solution however because it blocks the caller until the job finishes which might starve the other worker threads. Also, if there are multiple submitters, it might still cause too many threads to run the jobs.

您还可以使用ThreadPoolExecutor.CallerRunsPolicy这会导致将作业提交到线程池的调用者执行作业。但是我不喜欢这个解决方案,因为它会阻塞调用者,直到工作完成,这可能会使其他工作线程饿死。此外,如果有多个提交者,它仍然可能导致过多的线程来运行作业。

Lastly, notice that I set the core and max thread count in the ThreadPoolExecutorto the same number. Unfortunately, by default, the executor starts the core threads, then fills the queue, and only then does it allocate additional threads up to the max. This is completely counter-intuitive.

最后,请注意我将 中的核心和最大线程数设置ThreadPoolExecutor为相同的数字。不幸的是,默认情况下,执行程序启动核心线程,然后填充队列,然后才分配额外的线程到最大值。这完全违反直觉。

回答by John Vint

You can use a simple Semaphore. Upon submitting acquire a new permit and after completion release the permit to allow anyone else awaiting to submit.

您可以使用简单的信号量。提交后获取新许可证,完成后释放许可证以允许其他任何等待提交的人。

private final Semaphore semaphore = new Semaphore(10);//or however you want max queued at any given moment
ThreadPoolExecutor tp= new ThreadPoolExecutor(...){
      public void execute(Runnable r){
          semaphore.acquire();
          super.execute(r);
      }    
      public void afterExecute(Runnable r, Thread t){
         semaphore.release();  
         super.afterExecute(r,t);
      }
};

So here the submitting threads will be suspended if there are no more permits available.

因此,如果没有更多可用的许可,这里的提交线程将被暂停。

回答by Martin James

I usually throttle such systems by using a object 'pool queue' for the task objects - a BlockingQueue that is filled up with X tasks at startup. Anything that wants to submit a task to the threads has to get one from the pool queue, load it up with data and then submit it.

我通常通过为任务对象使用对象“池队列”来限制此类系统 - 一个在启动时充满 X 任务的 BlockingQueue。任何想要向线程提交任务的东西都必须从池队列中获取一个,用数据加载它,然后提交它。

When the task is completed and results in it processed, it is pushed back onto the pool queue for re-use.

当任务完成并导致它被处理时,它会被推回到池队列中以供重用。

If the pool empties, submitting threads block on the pool queue until some tasks are returned.

如果池为空,则提交线程在池队列中阻塞,直到返回一些任务。

This is essentially a form of semaphore control as suggested by @John Vint, but has some further advantages - no continual create/GC of the runnables, for example. I like to dump PooolQueue.size to a GUI status bar on a timer, so I can see how 'busy' the system is, (and also to quickly detect any object leaks:)

这本质上是@John Vint 建议的一种信号量控制形式,但还有一些进一步的优势——例如,没有连续创建/GC 的可运行对象。我喜欢将 PooolQueue.size 转储到计时器上的 GUI 状态栏,这样我就可以看到系统有多“忙”(以及快速检测任何对象泄漏:)

回答by Guido Medina

You will be better of setting a Rejection policy, since you don't want to overwhelm the ThreadPool, I have found that the best way to accomplish this without complicating yourself much is by doing something like this:

您最好设置拒绝策略,因为您不想压倒 ThreadPool,我发现完成此操作的最佳方法是执行以下操作,而不会使您自己复杂化:

final ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(THREADS_COUNT);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

What will happen is that once all Threads are busy, the caller's thread will execute the task. Here is a reference to such policy CallerRunsPolicy JavaDoc

将会发生的情况是,一旦所有线程都忙,调用者的线程就会执行任务。这是对此类政策CallerRunsPolicy JavaDoc的参考