Java 如何中断在 take() 上阻塞的 BlockingQueue?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/812342/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to interrupt a BlockingQueue which is blocking on take()?
提问by MCS
I have a class that takes objects from a BlockingQueue
and processes them by calling take()
in a continuous loop. At some point I know that no more objects will be added to the queue. How do I interrupt the take()
method so that it stops blocking?
我有一个类,它从 a 中获取对象BlockingQueue
并通过take()
在连续循环中调用来处理它们。在某些时候,我知道不会有更多的对象被添加到队列中。如何中断该take()
方法以使其停止阻塞?
Here's the class that processes the objects:
这是处理对象的类:
public class MyObjHandler implements Runnable {
private final BlockingQueue<MyObj> queue;
public class MyObjHandler(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
MyObj obj = queue.take();
// process obj here
// ...
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
And here's the method that uses this class to process objects:
这是使用此类处理对象的方法:
public void testHandler() {
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
// what code should go here to tell the handler
// to stop waiting for more objects?
}
采纳答案by Chris Thornhill
If interrupting the thread is not an option, another is to place a "marker" or "command" object on the queue that would be recognized as such by MyObjHandler and break out of the loop.
如果中断线程不是一种选择,另一种方法是在队列中放置一个“标记”或“命令”对象,MyObjHandler 会识别出这样的对象并退出循环。
回答by erickson
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
thread.interrupt();
However, if you do this, the thread might be interrupted while there are still items in the queue, waiting to be processed. You might want to consider using poll
instead of take
, which will allow the processing thread to timeout and terminate when it has waited for a while with no new input.
但是,如果您这样做,线程可能会在队列中仍有项目等待处理时中断。您可能需要考虑使用poll
代替take
,这将允许处理线程在等待一段时间而没有新输入时超时并终止。
回答by stepancheg
Interrupt the thread:
中断线程:
thread.interrupt()
回答by tomasb
Or don't interrupt, its nasty.
或者不要打扰,这很讨厌。
public class MyQueue<T> extends ArrayBlockingQueue<T> {
private static final long serialVersionUID = 1L;
private boolean done = false;
public ParserQueue(int capacity) { super(capacity); }
public void done() { done = true; }
public boolean isDone() { return done; }
/**
* May return null if producer ends the production after consumer
* has entered the element-await state.
*/
public T take() throws InterruptedException {
T el;
while ((el = super.poll()) == null && !done) {
synchronized (this) {
wait();
}
}
return el;
}
}
- when producer puts object to the queue, call
queue.notify()
, if it ends, callqueue.done()
- loop while (!queue.isDone() || !queue.isEmpty())
- test take() return value for null
- 当生产者将对象放入队列时,调用
queue.notify()
,如果结束,调用queue.done()
- 循环 while (!queue.isDone() || !queue.isEmpty())
- test take() 返回值为空
回答by dbw
Very late but Hope this helps other too asI faced the similar problem and used the poll
approach suggested by erickson abovewith some minor changes,
很晚了,但希望这对其他人也有帮助,因为我遇到了类似的问题,并使用了上面 ericksonpoll
建议的方法,并进行了一些小改动,
class MyObjHandler implements Runnable
{
private final BlockingQueue<MyObj> queue;
public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
public MyObjHandler(BlockingQueue queue)
{
this.queue = queue;
Finished = false;
}
@Override
public void run()
{
while (true)
{
try
{
MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
{
// process obj here
// ...
}
if(Finished && queue.isEmpty())
return;
}
catch (InterruptedException e)
{
return;
}
}
}
}
public void testHandler()
{
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjHandler handler = new MyObjHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
{
queue.put(i.next());
}
// what code should go here to tell the handler to stop waiting for more objects?
handler.Finished = true; //THIS TELLS HIM
//If you need you can wait for the termination otherwise remove join
myThread.join();
}
This solved both the problems
这解决了这两个问题
- Flagged the
BlockingQueue
so that it knows it has not to wait more for elements - Did not interrupted in between so that processing blocks terminates only when all the items in queue are processed and there are no items remaining to be added
- 标记
BlockingQueue
以便它知道它不必等待更多元素 - 中间没有中断,只有当队列中的所有项目都被处理并且没有剩余的项目要添加时,处理块才会终止