java “关闭”阻塞队列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/5378391/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-30 10:50:57  来源:igfitidea点击:

"Closing" a blocking queue

javamultithreadingblockingqueue

提问by Lachezar Balev

I'm using java.util.concurrent.BlockingQueuein a very simple producer-consumer scenario. E.g. this pseudo code depicts the consumer part:

我在一个非常简单的生产者-消费者场景中使用java.util.concurrent.BlockingQueue。例如,这个伪代码描述了消费者部分:

class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

So far so good. In the javadoc of the blocking queue I read:

到现在为止还挺好。在阻塞队列的 javadoc 中,我读到:

A BlockingQueue does not intrinsically support any kind of "close" or "shutdown" operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.

BlockingQueue 本质上不支持任何类型的“关闭”或“关闭”操作以指示不再添加项目。此类功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的流结束或毒物对象,当消费者采取时相应地解释这些对象。

Unfortunately because of the generics in use and the nature of ComplexObject it's not trivial to push a "poison object" into the queue. So this "common tactic" is not really convenient in my scenario.

不幸的是,由于使用的泛型和 ComplexObject 的性质,将“毒物对象”推入队列并非易事。所以这种“通用策略”在我的场景中并不是很方便。

My question is: what other good tactics/patterns can I use to "close" the queue?

我的问题是:我可以使用哪些其他好的策略/模式来“关闭”队列?

Thank you!

谢谢!

采纳答案by jdmichal

If you have a handle to the consumer thread, you can interrupt it. With the code you gave, that will kill the consumer. I would not expect the producer to have this; it would probably have to callback to the program controller somehow to let it know it's done. Then the controller would interrupt the consumer thread.

如果您有消费者线程的句柄,则可以中断它。使用您提供的代码,这将杀死消费者。我不希望制片人有这个;它可能不得不以某种方式回调到程序控制器,让它知道它已经完成。然后控制器会中断消费者线程。

You can always finish doing work before obeying the interrupt. For instance:

你总是可以在服从中断之前完成工作。例如:

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

I would actually prefer that to somehow poisoning the queue anyway. If you want to kill the thread, ask the thread to kill itself.

无论如何,我实际上更愿意以某种方式毒害队列。如果你想杀死线程,请让线程杀死自己。

(It's nice to see someone handling InterruptedExceptionproperly.)

(很高兴看到有人InterruptedException正确处理。)



There seems to be some contention about the handling of interruptions here. First, I would like everyone to read this article: http://www.ibm.com/developerworks/java/library/j-jtp05236.html

这里似乎有一些关于中断处理的争论。首先请大家阅读这篇文章:http: //www.ibm.com/developerworks/java/library/j-jtp05236.html

Now, with the understanding that no one actually read that, here's the deal. A thread will only receive an InterruptedExceptionif it was currently blocking at the time of interrupt. In this case, Thread.interrupted()will return false. If it was not blocking, it will NOT receive this exception, and instead Thread.interrupted()will return true. Therefore, your loop guard should absolutely, no matter what, check Thread.interrupted(), or otherwise risk missing an interruption to the thread.

现在,根据没有人真正阅读过的理解,这就是交易。如果线程InterruptedException在中断时当前处于阻塞状态,则它只会收到。在这种情况下,Thread.interrupted()将返回false。如果它没有阻塞,则不会收到此异常,而是Thread.interrupted()返回true. 因此,无论如何,您的循环防护都绝对应该检查Thread.interrupted(),否则可能会错过线程中断的风险。

So, since you are checking Thread.interrupted()no matter what, and you are forced to catch InterruptedException(and should be dealing with it even if you weren't forced to), you now have two code areas which handle the same event, thread interruption. One way to handle this is normalize them into one condition, meaning either the boolean state check can throw the exception, or the exception can set the boolean state. I choose the later.

因此,由于您Thread.interrupted()无论如何都在检查,并且您被迫捕获InterruptedException(并且即使您没有被迫捕获也应该处理它),您现在有两个代码区域处理相同的事件,线程中断。处理这个问题的一种方法是将它们规范化为一个条件,这意味着布尔状态检查可以抛出异常,或者异常可以设置布尔状态。我选择后者。



Edit:Note that the static Thread#interrupted method clearsthe the interrupted status of the current thread.

编辑:请注意静态 Thread#interrupted 方法清除当前线程的中断状态。

回答by Jason S

Another idea for making this simple:

使这个简单的另一个想法:

class ComplexObject implements QueueableComplexObject
{
    /* the meat of your complex object is here as before, just need to
     * add the following line and the "implements" clause above
     */
    @Override public ComplexObject asComplexObject() { return this; }
}

enum NullComplexObject implements QueueableComplexObject
{
    INSTANCE;

    @Override public ComplexObject asComplexObject() { return null; }
}

interface QueueableComplexObject
{
    public ComplexObject asComplexObject();
}

Then use BlockingQueue<QueueableComplexObject>as the queue. When you wish to end the queue's processing, do queue.offer(NullComplexObject.INSTANCE). On the consumer side, do

然后BlockingQueue<QueueableComplexObject>用作队列。当您希望结束队列的处理时,请执行queue.offer(NullComplexObject.INSTANCE)。在消费者方面,做

boolean ok = true;
while (ok)
{
    ComplexObject obj = queue.take().asComplexObject();
    if (obj == null)
        ok = false;
    else
        process(obj);
}

/* interrupt handling elided: implement this as you see fit,
 * depending on whether you watch to swallow interrupts or propagate them
 * as in your original post
 */

No instanceofrequired, and you don't have to construct a fake ComplexObjectwhich may be expensive/difficult depending on its implementation.

不需要instanceof,并且您不必构建ComplexObject可能昂贵/困难的假货,具体取决于其实现。

回答by Rob Hruska

An alternative would be to wrap the processing you're doing with an ExecutorService, and let the ExecutorServiceitself control whether or not jobs get added to the queue.

另一种方法是使用 包装您正在执行的处理ExecutorService,并让其ExecutorService自身控制是否将作业添加到队列中。

Basically, you take advantage of ExecutorService.shutdown(), which when called disallows any more tasks from being processed by the executor.

基本上,您可以利用ExecutorService.shutdown(),它在调用时不允许执行程序处理任何更多任务。

I'm not sure how you're currently submitting tasks to the QueueConsumerin your example. I've made the assumption that you have some sort of submit()method, and used a similar method in the example.

我不确定您目前如何向QueueConsumer示例中的提交任务。我假设您有某种submit()方法,并在示例中使用了类似的方法。

import java.util.concurrent.*;

class QueueConsumer {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public void shutdown() {
        executor.shutdown(); // gracefully shuts down the executor
    }

    // 'Result' is a class you'll have to write yourself, if you want.
    // If you don't need to provide a result, you can just Runnable
    // instead of Callable.
    public Future<Result> submit(final ComplexObject complexObject) {
        if(executor.isShutdown()) {
            // handle submitted tasks after the executor has been told to shutdown
        }

        return executor.submit(new Callable<Result>() {
            @Override
            public Result call() {
                return process(complexObject);
            }
        });
    }

    private Result process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

This example is just an off-the-cuff illustration of what the java.util.concurrentpackage offers; there are probably some optimizations that could be made to it (e.g., QueueConsumeras its own class probably isn't even necessary; you could just provide the ExecutorServiceto whatever producers are submitting the tasks).

这个例子只是java.util.concurrent包装所提供的即兴展示;可能有一些可以对其进行优化(例如,QueueConsumer因为它自己的类可能甚至没有必要;您可以将 提供ExecutorService给任何正在提交任务的生产者)。

Dig through the java.util.concurrentpackage (starting at some of the links above). You might find that it gives you a lot of great options for what you're trying to do, and you don't even have to worry about regulating the work queue.

仔细阅读java.util.concurrent包(从上面的一些链接开始)。您可能会发现它为您正在尝试做的事情提供了很多很好的选择,而且您甚至不必担心调整工作队列。

回答by jdmichal

Another possibility for making a poison object: Make it be a particular instance of the class. This way, you do not have to muck around subtypes or screw up your generic.

制作有毒对象的另一种可能性:使其成为类的特定实例。这样,您就不必纠结于子类型或搞砸您的泛型。

Drawback: This won't work if there's some sort of serialization barrier between the producer and consumer.

缺点:如果生产者和消费者之间存在某种序列化障碍,这将不起作用。

public class ComplexObject
{
    public static final POISON_INSTANCE = new ComplexObject();

    public ComplexObject(whatever arguments) {
    }

    // Empty constructor for creating poison instance.
    private ComplexObject() {
    }
}

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().interrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                if (complexObject == ComplexObject.POISON_INSTANCE)
                    return;

                // Process complex object.

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }
    }
}

回答by Dorus

You can wrap your generic object into a dataobject. On this dataobject you can add additional data like the poison object status. The dataobject is a class with 2 fields. T complexObject;and boolean poison;.

您可以将通用对象包装到数据对象中。在此数据对象上,您可以添加其他数据,如毒物对象状态。数据对象是一个有 2 个字段的类。T complexObject;boolean poison;

Your consumer takes the data objects from the queue. If a poison object is returned, you close the consumer, else you unwrap the generic and call 'process(complexObject)'.

您的消费者从队列中获取数据对象。如果返回有害对象,则关闭使用者,否则打开泛型并调用“process(complexObject)”。

I'm using a java.util.concurrent.LinkedBlockingDeque<E>so that you can add object at the end of the queue and take them from the front. That way your object will be handled in order, but more important it's safe to close the queue after you run into the poison object.

我正在使用 ajava.util.concurrent.LinkedBlockingDeque<E>以便您可以在队列末尾添加对象并从前面获取它们。这样您的对象将按顺序处理,但更重要的是,在遇到有害对象后关闭队列是安全的。

To support multiple consumers, I add the poison object back onto the queue when I run into it.

为了支持多个消费者,当我遇到它时,我将毒物对象重新添加到队列中。

public final class Data<T> {
    private boolean poison = false;
    private T complexObject;

    public Data() {
        this.poison = true;
    }

    public Data(T complexObject) {
        this.complexObject = complexObject;
    }

    public boolean isPoison() {
        return poison;
    }

    public T getComplexObject() {
        return complexObject;
    }
}
public class Consumer <T> implements Runnable {

    @Override
    public final void run() {
        Data<T> data;
        try {
            while (!(data = queue.takeFirst()).isPoison()) {
                process(data.getComplexObject());
            }
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        // add the poison object back so other consumers can stop too.
        queue.addLast(line);
    }
}

回答by Chris Knight

Is it possible to extend ComplexObject and mock out the non-trivial creation functionality? Essentially you're ending up with a shell object but you can do then do instance ofto see if is the end of queue object.

是否可以扩展 ComplexObject 并模拟非平凡的创建功能?本质上,您最终会得到一个 shell 对象,但是您可以执行 theninstance of来查看是否是队列对象的结尾。

回答by Corin

It seems reasonable to me to implement a close-able BlockingQueue:

实现 close-able 对我来说似乎是合理的BlockingQueue

import java.util.concurrent.BlockingQueue;

public interface CloseableBlockingQueue<E> extends BlockingQueue<E> {
    /** Returns <tt>true</tt> if this queue is closed, <tt>false</tt> otherwise. */
    public boolean isClosed();

    /** Closes this queue; elements cannot be added to a closed queue. **/
    public void close();
}

It would be quite straight forward to implement this with the following behaviours (cf. the methods summary table):

使用以下行为(参见方法摘要表)来实现这一点将非常简单:

  • Insert:

    • Throws exception, Special value:

      Behaves like a full Queue, caller's responsibility to test isClosed().

    • Blocks:

      Throws IllegalStateExceptionif and when closed.

    • Times out:

      Returns falseif and when closed, caller's responsibility to test isClosed().

  • Remove:

    • Throws exception, Special value:

      Behaves like a empty Queue, caller's responsibility to test isClosed().

    • Blocks:

      Throws NoSuchElementExceptionif and when closed.

    • Times out:

      Returns nullif and when closed, caller's responsibility to test isClosed().

  • Examine

    No change.

  • 插入

    • 抛出异常特殊值

      表现得像一个完整的Queue,调用者的测试责任isClosed()

    • IllegalStateException关闭时抛出。

    • 超时

      false如果关闭,则返回调用者对 test 的责任isClosed()

  • 删除

    • 抛出异常特殊值

      表现得像一个空的Queue,调用者有责任测试isClosed()

    • NoSuchElementException关闭时抛出。

    • 超时

      null如果关闭,则返回调用者对 test 的责任isClosed()

  • 检查

    没变化。

I did this by editing the source, find it at github.com.

我通过编辑源代码来做到这一点,在github.com 上找到它。

回答by daliborn

I have used this system:

我用过这个系统:

ConsumerClass
private boolean queueIsNotEmpty = true;//with setter
...
do {
    ...
    sharedQueue.drainTo(docs);
    ...
} while (queueIsNotEmpty || sharedQueue.isEmpty());

When producer finish, I set on consumerObject, queueIsNotEmpty field to false

当生产者完成时,我将消费者对象的 queueIsNotEmpty 字段设置为 false

回答by ArnabRaxit

Today I solved this problem using a wrapper object. Since the ComplexObject is too complex to subclass I wrapped the ComplexObject into ComplexObjectWrapper object. Then used ComplexObjectWrapper as the generic type.

今天我使用包装器对象解决了这个问题。由于 ComplexObject 太复杂而无法进行子类化,我将 ComplexObject 包装到 ComplexObjectWrapper 对象中。然后使用 ComplexObjectWrapper 作为泛型类型。

public class ComplexObjectWrapper {
ComplexObject obj;
}

public class EndOfQueue extends ComplexObjectWrapper{}

Now instead of BlockingQueue<ComplexObject>I did BlockingQueue<ComplexObjectWrapper>

现在而不是BlockingQueue<ComplexObject>我做的 BlockingQueue<ComplexObjectWrapper>

Since I had control of both the Consumer and Producer this solution worked for me.

由于我同时控制了消费者和生产者,因此该解决方案对我有用。

回答by jtahlborn

In this situation, you generally have to ditch the generics and make the queue hold type Object. then, you just need check for your "poison" Object before casting to the actual type.

在这种情况下,您通常必须放弃泛型并使队列保持类型为 Object。然后,您只需要在转换为实际类型之前检查您的“毒药”对象。