Java ExecutorService,避免任务队列太满的标准方法
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/2247734/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
ExecutorService, standard way to avoid to task queue getting too full
提问by manuel aldana
I am using ExecutorService
for ease of concurrent multithreaded program. Take following code:
我正在使用ExecutorService
并发多线程程序。取以下代码:
while(xxx) {
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
...
Future<..> ... = exService.submit(..);
...
}
In my case the problem is that submit()
is not blocking if all NUMBER_THREADS
are occupied. The consequence is that the Task queue is getting flooded by many tasks. The consequence of this is, that shutting down the execution service with ExecutorService.shutdown()
takes ages (ExecutorService.isTerminated()
will be false for long time). Reason is that the task queue is still quite full.
在我的情况下,问题是submit()
如果所有人NUMBER_THREADS
都被占用了就不会阻塞。结果是任务队列被许多任务淹没。这样做的结果是,关闭执行服务ExecutorService.shutdown()
需要ExecutorService.isTerminated()
很长时间(很长一段时间都是假的)。原因是任务队列还是满的。
For now my workaround is to work with semaphores to disallow to have to many entries inside the task queue of ExecutorService
:
现在,我的解决方法是使用信号量来禁止在以下任务队列中有多个条目ExecutorService
:
...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);
while(xxx) {
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
...
semaphore.aquire();
// internally the task calls a finish callback, which invokes semaphore.release()
// -> now another task is added to queue
Future<..> ... = exService.submit(..);
...
}
I am sure there is a better more encapsulated solution?
我确定有更好的封装解决方案吗?
回答by Kevin
You're better off creating the ThreadPoolExecutoryourself (which is what Executors.newXXX() does anyway).
您最好自己创建ThreadPoolExecutor(无论如何,这就是 Executors.newXXX() 所做的)。
In the constructor, you can pass in a BlockingQueue for the Executor to use as its task queue. If you pass in a size constrained BlockingQueue (like LinkedBlockingQueue), it should achieve the effect you want.
在构造函数中,您可以传入一个 BlockingQueue 供 Executor 用作其任务队列。如果你传入一个大小受限的 BlockingQueue(比如LinkedBlockingQueue),它应该可以达到你想要的效果。
ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize));
回答by Peter Lawrey
You can call ThreadPoolExecutor.getQueue().size()
to find out the size of the waiting queue. You can take an action if the queue is too long. I suggest running the task in the current thread if the queue is too long to slow down the producer (if that is appropriate).
您可以调用ThreadPoolExecutor.getQueue().size()
以了解等待队列的大小。如果队列太长,您可以采取措施。如果队列太长而无法减慢生产者的速度(如果合适),我建议在当前线程中运行任务。
回答by mdma
A true blocking ThreadPoolExecutor has been on the wishlist of many, there's even a JDC bug opened on it. I'm facing the same problem, and came across this: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html
一个真正的阻塞 ThreadPoolExecutor 已经出现在许多人的愿望清单上,甚至还有一个 JDC 漏洞。我面临同样的问题,并遇到了这个:http: //today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html
It's an implementation of a BlockingThreadPoolExecutor, implemented using a RejectionPolicy that uses offer to add the task to the queue, waiting for the queue to have room. It looks good.
它是 BlockingThreadPoolExecutor 的一个实现,使用 RejectionPolicy 实现,该 RejectionPolicy 使用 offer 将任务添加到队列中,等待队列有空间。这看起来不错的样子。
回答by bilal
you can add another bloquing queue that's has limited size to controle the size of internal queue in executorService, some thinks like semaphore but very easy. before executor you put() and whene the task achive take(). take() must be inside the task code
您可以添加另一个大小有限的 bloquing 队列来控制 executorService 中内部队列的大小,有些人认为像信号量但很容易。在执行者之前你 put() 和 whene 任务实现 take()。take() 必须在任务代码中
回答by Adam Gent
The trick is to use a fixed queue size and:
诀窍是使用固定的队列大小,并且:
new ThreadPoolExecutor.CallerRunsPolicy()
I also recommend using Guava's ListeningExecutorService. Here is an example consumer/producer queues.
我还建议使用 Guava 的ListeningExecutorService。这是一个示例消费者/生产者队列。
private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
Anything better and you might want to consider a MQ like RabbitMQ or ActiveMQ as they have QoS technology.
如果有更好的选择,您可能需要考虑 MQ 之类的 RabbitMQ 或 ActiveMQ,因为它们具有 QoS 技术。