java “关闭”阻塞队列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/5378391/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
"Closing" a blocking queue
提问by Lachezar Balev
I'm using java.util.concurrent.BlockingQueuein a very simple producer-consumer scenario. E.g. this pseudo code depicts the consumer part:
我在一个非常简单的生产者-消费者场景中使用java.util.concurrent.BlockingQueue。例如,这个伪代码描述了消费者部分:
class QueueConsumer implements Runnable {
@Override
public void run() {
while(true)
{
try {
ComplexObject complexObject = myBlockingQueue.take();
//do something with the complex object
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
So far so good. In the javadoc of the blocking queue I read:
到现在为止还挺好。在阻塞队列的 javadoc 中,我读到:
A BlockingQueue does not intrinsically support any kind of "close" or "shutdown" operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.
BlockingQueue 本质上不支持任何类型的“关闭”或“关闭”操作以指示不再添加项目。此类功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的流结束或毒物对象,当消费者采取时相应地解释这些对象。
Unfortunately because of the generics in use and the nature of ComplexObject it's not trivial to push a "poison object" into the queue. So this "common tactic" is not really convenient in my scenario.
不幸的是,由于使用的泛型和 ComplexObject 的性质,将“毒物对象”推入队列并非易事。所以这种“通用策略”在我的场景中并不是很方便。
My question is: what other good tactics/patterns can I use to "close" the queue?
我的问题是:我可以使用哪些其他好的策略/模式来“关闭”队列?
Thank you!
谢谢!
采纳答案by jdmichal
If you have a handle to the consumer thread, you can interrupt it. With the code you gave, that will kill the consumer. I would not expect the producer to have this; it would probably have to callback to the program controller somehow to let it know it's done. Then the controller would interrupt the consumer thread.
如果您有消费者线程的句柄,则可以中断它。使用您提供的代码,这将杀死消费者。我不希望制片人有这个;它可能不得不以某种方式回调到程序控制器,让它知道它已经完成。然后控制器会中断消费者线程。
You can always finish doing work before obeying the interrupt. For instance:
你总是可以在服从中断之前完成工作。例如:
class QueueConsumer implements Runnable {
@Override
public void run() {
while(!(Thread.currentThread().isInterrupted())) {
try {
final ComplexObject complexObject = myBlockingQueue.take();
this.process(complexObject);
} catch (InterruptedException e) {
// Set interrupted flag.
Thread.currentThread().interrupt();
}
}
// Thread is getting ready to die, but first,
// drain remaining elements on the queue and process them.
final LinkedList<ComplexObject> remainingObjects;
myBlockingQueue.drainTo(remainingObjects);
for(ComplexObject complexObject : remainingObjects) {
this.process(complexObject);
}
}
private void process(final ComplexObject complexObject) {
// Do something with the complex object.
}
}
I would actually prefer that to somehow poisoning the queue anyway. If you want to kill the thread, ask the thread to kill itself.
无论如何,我实际上更愿意以某种方式毒害队列。如果你想杀死线程,请让线程杀死自己。
(It's nice to see someone handling InterruptedException
properly.)
(很高兴看到有人InterruptedException
正确处理。)
There seems to be some contention about the handling of interruptions here. First, I would like everyone to read this article: http://www.ibm.com/developerworks/java/library/j-jtp05236.html
这里似乎有一些关于中断处理的争论。首先请大家阅读这篇文章:http: //www.ibm.com/developerworks/java/library/j-jtp05236.html
Now, with the understanding that no one actually read that, here's the deal. A thread will only receive an InterruptedException
if it was currently blocking at the time of interrupt. In this case, Thread.interrupted()
will return false
. If it was not blocking, it will NOT receive this exception, and instead Thread.interrupted()
will return true
. Therefore, your loop guard should absolutely, no matter what, check Thread.interrupted()
, or otherwise risk missing an interruption to the thread.
现在,根据没有人真正阅读过的理解,这就是交易。如果线程InterruptedException
在中断时当前处于阻塞状态,则它只会收到。在这种情况下,Thread.interrupted()
将返回false
。如果它没有阻塞,则不会收到此异常,而是Thread.interrupted()
返回true
. 因此,无论如何,您的循环防护都绝对应该检查Thread.interrupted()
,否则可能会错过线程中断的风险。
So, since you are checking Thread.interrupted()
no matter what, and you are forced to catch InterruptedException
(and should be dealing with it even if you weren't forced to), you now have two code areas which handle the same event, thread interruption. One way to handle this is normalize them into one condition, meaning either the boolean state check can throw the exception, or the exception can set the boolean state. I choose the later.
因此,由于您Thread.interrupted()
无论如何都在检查,并且您被迫捕获InterruptedException
(并且即使您没有被迫捕获也应该处理它),您现在有两个代码区域处理相同的事件,线程中断。处理这个问题的一种方法是将它们规范化为一个条件,这意味着布尔状态检查可以抛出异常,或者异常可以设置布尔状态。我选择后者。
Edit:Note that the static Thread#interrupted method clearsthe the interrupted status of the current thread.
编辑:请注意静态 Thread#interrupted 方法清除当前线程的中断状态。
回答by Jason S
Another idea for making this simple:
使这个简单的另一个想法:
class ComplexObject implements QueueableComplexObject
{
/* the meat of your complex object is here as before, just need to
* add the following line and the "implements" clause above
*/
@Override public ComplexObject asComplexObject() { return this; }
}
enum NullComplexObject implements QueueableComplexObject
{
INSTANCE;
@Override public ComplexObject asComplexObject() { return null; }
}
interface QueueableComplexObject
{
public ComplexObject asComplexObject();
}
Then use BlockingQueue<QueueableComplexObject>
as the queue. When you wish to end the queue's processing, do queue.offer(NullComplexObject.INSTANCE)
. On the consumer side, do
然后BlockingQueue<QueueableComplexObject>
用作队列。当您希望结束队列的处理时,请执行queue.offer(NullComplexObject.INSTANCE)
。在消费者方面,做
boolean ok = true;
while (ok)
{
ComplexObject obj = queue.take().asComplexObject();
if (obj == null)
ok = false;
else
process(obj);
}
/* interrupt handling elided: implement this as you see fit,
* depending on whether you watch to swallow interrupts or propagate them
* as in your original post
*/
No instanceof
required, and you don't have to construct a fake ComplexObject
which may be expensive/difficult depending on its implementation.
不需要instanceof
,并且您不必构建ComplexObject
可能昂贵/困难的假货,具体取决于其实现。
回答by Rob Hruska
An alternative would be to wrap the processing you're doing with an ExecutorService
, and let the ExecutorService
itself control whether or not jobs get added to the queue.
另一种方法是使用 包装您正在执行的处理ExecutorService
,并让其ExecutorService
自身控制是否将作业添加到队列中。
Basically, you take advantage of ExecutorService.shutdown()
, which when called disallows any more tasks from being processed by the executor.
基本上,您可以利用ExecutorService.shutdown()
,它在调用时不允许执行程序处理任何更多任务。
I'm not sure how you're currently submitting tasks to the QueueConsumer
in your example. I've made the assumption that you have some sort of submit()
method, and used a similar method in the example.
我不确定您目前如何向QueueConsumer
示例中的提交任务。我假设您有某种submit()
方法,并在示例中使用了类似的方法。
import java.util.concurrent.*;
class QueueConsumer {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public void shutdown() {
executor.shutdown(); // gracefully shuts down the executor
}
// 'Result' is a class you'll have to write yourself, if you want.
// If you don't need to provide a result, you can just Runnable
// instead of Callable.
public Future<Result> submit(final ComplexObject complexObject) {
if(executor.isShutdown()) {
// handle submitted tasks after the executor has been told to shutdown
}
return executor.submit(new Callable<Result>() {
@Override
public Result call() {
return process(complexObject);
}
});
}
private Result process(final ComplexObject complexObject) {
// Do something with the complex object.
}
}
This example is just an off-the-cuff illustration of what the java.util.concurrent
package offers; there are probably some optimizations that could be made to it (e.g., QueueConsumer
as its own class probably isn't even necessary; you could just provide the ExecutorService
to whatever producers are submitting the tasks).
这个例子只是java.util.concurrent
包装所提供的即兴展示;可能有一些可以对其进行优化(例如,QueueConsumer
因为它自己的类可能甚至没有必要;您可以将 提供ExecutorService
给任何正在提交任务的生产者)。
Dig through the java.util.concurrent
package (starting at some of the links above). You might find that it gives you a lot of great options for what you're trying to do, and you don't even have to worry about regulating the work queue.
仔细阅读java.util.concurrent
包(从上面的一些链接开始)。您可能会发现它为您正在尝试做的事情提供了很多很好的选择,而且您甚至不必担心调整工作队列。
回答by jdmichal
Another possibility for making a poison object: Make it be a particular instance of the class. This way, you do not have to muck around subtypes or screw up your generic.
制作有毒对象的另一种可能性:使其成为类的特定实例。这样,您就不必纠结于子类型或搞砸您的泛型。
Drawback: This won't work if there's some sort of serialization barrier between the producer and consumer.
缺点:如果生产者和消费者之间存在某种序列化障碍,这将不起作用。
public class ComplexObject
{
public static final POISON_INSTANCE = new ComplexObject();
public ComplexObject(whatever arguments) {
}
// Empty constructor for creating poison instance.
private ComplexObject() {
}
}
class QueueConsumer implements Runnable {
@Override
public void run() {
while(!(Thread.currentThread().interrupted())) {
try {
final ComplexObject complexObject = myBlockingQueue.take();
if (complexObject == ComplexObject.POISON_INSTANCE)
return;
// Process complex object.
} catch (InterruptedException e) {
// Set interrupted flag.
Thread.currentThread().interrupt();
}
}
}
}
回答by Dorus
You can wrap your generic object into a dataobject. On this dataobject you can add additional data like the poison object status. The dataobject is a class with 2 fields. T complexObject;
and boolean poison;
.
您可以将通用对象包装到数据对象中。在此数据对象上,您可以添加其他数据,如毒物对象状态。数据对象是一个有 2 个字段的类。T complexObject;
和boolean poison;
。
Your consumer takes the data objects from the queue. If a poison object is returned, you close the consumer, else you unwrap the generic and call 'process(complexObject)'.
您的消费者从队列中获取数据对象。如果返回有害对象,则关闭使用者,否则打开泛型并调用“process(complexObject)”。
I'm using a java.util.concurrent.LinkedBlockingDeque<E>
so that you can add object at the end of the queue and take them from the front. That way your object will be handled in order, but more important it's safe to close the queue after you run into the poison object.
我正在使用 ajava.util.concurrent.LinkedBlockingDeque<E>
以便您可以在队列末尾添加对象并从前面获取它们。这样您的对象将按顺序处理,但更重要的是,在遇到有害对象后关闭队列是安全的。
To support multiple consumers, I add the poison object back onto the queue when I run into it.
为了支持多个消费者,当我遇到它时,我将毒物对象重新添加到队列中。
public final class Data<T> {
private boolean poison = false;
private T complexObject;
public Data() {
this.poison = true;
}
public Data(T complexObject) {
this.complexObject = complexObject;
}
public boolean isPoison() {
return poison;
}
public T getComplexObject() {
return complexObject;
}
}
public class Consumer <T> implements Runnable {
@Override
public final void run() {
Data<T> data;
try {
while (!(data = queue.takeFirst()).isPoison()) {
process(data.getComplexObject());
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// add the poison object back so other consumers can stop too.
queue.addLast(line);
}
}
回答by Chris Knight
Is it possible to extend ComplexObject and mock out the non-trivial creation functionality? Essentially you're ending up with a shell object but you can do then do instance of
to see if is the end of queue object.
是否可以扩展 ComplexObject 并模拟非平凡的创建功能?本质上,您最终会得到一个 shell 对象,但是您可以执行 theninstance of
来查看是否是队列对象的结尾。
回答by Corin
It seems reasonable to me to implement a close-able BlockingQueue
:
实现 close-able 对我来说似乎是合理的BlockingQueue
:
import java.util.concurrent.BlockingQueue;
public interface CloseableBlockingQueue<E> extends BlockingQueue<E> {
/** Returns <tt>true</tt> if this queue is closed, <tt>false</tt> otherwise. */
public boolean isClosed();
/** Closes this queue; elements cannot be added to a closed queue. **/
public void close();
}
It would be quite straight forward to implement this with the following behaviours (cf. the methods summary table):
使用以下行为(参见方法摘要表)来实现这一点将非常简单:
Insert:
Throws exception, Special value:
Behaves like a full
Queue
, caller's responsibility to testisClosed()
.Throws
IllegalStateException
if and when closed.Returns
false
if and when closed, caller's responsibility to testisClosed()
.
Remove:
Throws exception, Special value:
Behaves like a empty
Queue
, caller's responsibility to testisClosed()
.Throws
NoSuchElementException
if and when closed.Returns
null
if and when closed, caller's responsibility to testisClosed()
.
Examine
No change.
插入:
删除:
检查
没变化。
I did this by editing the source, find it at github.com.
我通过编辑源代码来做到这一点,在github.com 上找到它。
回答by daliborn
I have used this system:
我用过这个系统:
ConsumerClass
private boolean queueIsNotEmpty = true;//with setter
...
do {
...
sharedQueue.drainTo(docs);
...
} while (queueIsNotEmpty || sharedQueue.isEmpty());
When producer finish, I set on consumerObject, queueIsNotEmpty field to false
当生产者完成时,我将消费者对象的 queueIsNotEmpty 字段设置为 false
回答by ArnabRaxit
Today I solved this problem using a wrapper object. Since the ComplexObject is too complex to subclass I wrapped the ComplexObject into ComplexObjectWrapper object. Then used ComplexObjectWrapper as the generic type.
今天我使用包装器对象解决了这个问题。由于 ComplexObject 太复杂而无法进行子类化,我将 ComplexObject 包装到 ComplexObjectWrapper 对象中。然后使用 ComplexObjectWrapper 作为泛型类型。
public class ComplexObjectWrapper {
ComplexObject obj;
}
public class EndOfQueue extends ComplexObjectWrapper{}
Now instead of BlockingQueue<ComplexObject>
I did
BlockingQueue<ComplexObjectWrapper>
现在而不是BlockingQueue<ComplexObject>
我做的
BlockingQueue<ComplexObjectWrapper>
Since I had control of both the Consumer and Producer this solution worked for me.
由于我同时控制了消费者和生产者,因此该解决方案对我有用。
回答by jtahlborn
In this situation, you generally have to ditch the generics and make the queue hold type Object. then, you just need check for your "poison" Object before casting to the actual type.
在这种情况下,您通常必须放弃泛型并使队列保持类型为 Object。然后,您只需要在转换为实际类型之前检查您的“毒药”对象。