Java 流 API 和队列:订阅 BlockingQueue 流样式

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23462209/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 23:08:34  来源:igfitidea点击:

Stream API and Queues: Subscribe to BlockingQueue stream-style

javajava-8java-stream

提问by Mikhail Boyarsky

Let's say we have a Queue

假设我们有一个队列

BlockingQueue<String> queue= new LinkedBlockingQueue<>();

and some other thread puts values in it, then we read it like

和其他一些线程将值放入其中,然后我们读取它

while (true) {
    String next = queue.take();
    System.out.println("next message:" + next);
}

How can I iterate over this queue in stream style, while maintaining similar semantics to above code.

如何以流样式迭代这个队列,同时保持与上述代码相似的语义。

This code only traverses the current queue state:

这段代码只遍历当前队列状态:

queue.stream().forEach(e -> System.out.println(e));

采纳答案by Stuart Marks

I'm guessing a bit at what you're expecting, but I think I have a good hunch.

我有点猜测你的期望,但我想我有一个很好的预感。

The stream of a queue, like iterating over a queue, represents the current contentsof the queue. When the iterator or the stream reaches the tail of the queue, it doesn't block awaiting further elements to be added. The iterator or the stream is exhausted at that point and the computation terminates.

队列的流,就像遍历队列一样,代表队列的当前内容。当迭代器或流到达队列的尾部时,它不会阻塞等待添加更多元素。迭代器或流在此时耗尽,计算终止。

If you want a stream that consists of all current and future elements of the queue, you can do something like this:

如果您想要一个包含队列中所有当前和未来元素的流,您可以执行以下操作:

Stream.generate(() -> {
        try {
            return queue.take();
        } catch (InterruptedException ie) {
            return "Interrupted!";
        }
    })
    .filter(s -> s.endsWith("x"))
    .forEach(System.out::println);   

(Unfortunately the need to handle InterruptedExceptionmakes this quite messy.)

(不幸的是,需要处理InterruptedException使得这非常混乱。)

Note that there is no way to close a queue, and there is no way for Stream.generateto stop generating elements, so this is effectively an infinite stream. The only way to terminate it is with a short-circuiting stream operation such as findFirst.

请注意,无法关闭队列,也无法Stream.generate停止生成元素,因此这实际上是一个无限流。终止它的唯一方法是使用短路流操作,例如findFirst.

回答by John McClean

You could look at an async Queue implementation. If you have Java 8, then cyclops-react, I a developer on this project, provides an async.Queue that will allow you to both populate and consume from the Queue asyncrhonously (and cleanly).

您可以查看异步队列实现。如果你有 Java 8,那么cyclops-react,我是这个项目的开发者,提供了一个 async.Queue ,它允许你异步地(并且干净地)从队列中填充和消费。

e.g.

例如

Queue<String> queue = QueueFactories.<String>unboundedQueue().build();

Or simply (as long as this is a com.aol.simple.react.async.Queue)

或者干脆(只要这是一个 com.aol.simple.react.async.Queue)

Queue<String> queue = new Queue<>();

Then in a separate thread :

然后在一个单独的线程中:

new Thread(() -> {
        while (true) {
            queue.add("New message " + System.currentTimeMillis());
        }
    }).start();

Back on your main thread, your original code should now work as expected (infinetely iterate over the messages being added to the queue and print them out)

回到您的主线程,您的原始代码现在应该按预期工作(无限迭代添加到队列中的消息并将它们打印出来)

queue.stream().forEach(e -> System.out.println(e));

The Queue and hence the Stream can be closed at any stage via -

队列和流可以通过以下方式在任何阶段关闭 -

queue.close();

回答by user3787629

Another approach is to build a custom Spliterator. In my case, I've got a blocking queue, and I want to build a stream that continues to extract elements, until the block times out. The spliterator is something like:

另一种方法是构建自定义 Spliterator。就我而言,我有一个阻塞队列,我想构建一个继续提取元素的流,直到阻塞超时。拆分器是这样的:

public class QueueSpliterator<T> implements Spliterator<T> {
    private final BlockingQueue<T> queue;
    private final long timeoutMs;

    public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) {
        this.queue = queue;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public int characteristics() {
        return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        try {
            final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
            if (next == null) {
                return false;
            }
            action.accept(next);
            return true;
        } catch (final InterruptedException e) {
            throw new SupplierErrorException("interrupted", e);
        }
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

}

The exception thrown to handle InterruptedException is an extension of RuntimeException. Using this class, one can build a stream via: StreamSupport.stream(new QueueSpliterator(...)) and add the usual stream operations.

为处理 InterruptedException 抛出的异常是 RuntimeException 的扩展。使用此类,您可以通过以下方式构建流: StreamSupport.stream(new QueueSpliterator(...)) 并添加常用的流操作。