Java中的并发和阻塞队列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/1212386/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Concurrent and Blocking Queue in Java
提问by Yiannis
I have the classic problem of a thread pushing events to the incoming queue of a second thread. Only this time, I am very interested about performance. What I want to achieve is:
我有一个线程将事件推送到第二个线程的传入队列的经典问题。只是这一次,我对性能很感兴趣。我想要实现的是:
- I want concurrent access to the queue, the producer pushing, the receiver poping.
- When the queue is empty, I want the consumer to block to the queue, waiting for the producer.
- 我想要并发访问队列,生产者推送,接收者弹出。
- 当队列为空时,我希望消费者阻塞到队列中,等待生产者。
My first idea was to use a LinkedBlockingQueue
, but I soon realized that it is not concurrent and the performance suffered. On the other hand, I now use a ConcurrentLinkedQueue
, but still I am paying the cost of wait()
/ notify()
on each publication. Since the consumer, upon finding an empty queue, does not block, I have to synchronize and wait()
on a lock. On the other part, the producer has to get that lock and notify()
upon every single publication. The overall result is that I am paying the cost of
sycnhronized (lock) {lock.notify()}
in every single publication, even when not needed.
我的第一个想法是使用 a LinkedBlockingQueue
,但我很快意识到它不是并发的并且性能受到影响。另一方面,我现在使用ConcurrentLinkedQueue
,但我仍然要为每个出版物支付wait()
/的费用notify()
。由于消费者在找到空队列时不会阻塞,因此我必须同步并wait()
锁定。另一方面,生产者必须获得该锁定并锁定notify()
每一个出版物。总体结果是sycnhronized (lock) {lock.notify()}
,即使不需要,我也要为每份出版物支付费用
。
What I guess is needed here, is a queue that is both blocking and concurrent. I imagine a push()
operation to work as in ConcurrentLinkedQueue
, with an extra notify()
to the object when the pushed element is the first in the list. Such a check I consider to already exist in the ConcurrentLinkedQueue
, as pushing requires connecting with the next element. Thus, this would be much faster than synchronizing every time on the external lock.
我猜这里需要的是一个既阻塞又并发的队列。我想象一个push()
像 in 一样工作的操作,当被推送的元素是列表中的第一个时,对对象ConcurrentLinkedQueue
有一个额外notify()
的作用。我认为这样的检查已经存在于 中ConcurrentLinkedQueue
,因为推送需要连接下一个元素。因此,这将比每次在外部锁上同步要快得多。
Is something like this available/reasonable?
这样的事情是否可用/合理?
采纳答案by Rorick
I think you can stick to java.util.concurrent.LinkedBlockingQueue
regardless of your doubts. It is concurrent. Though, I have no idea about its performance. Probably, other implementation of BlockingQueue
will suit you better. There's not too many of them, so make performance tests and measure.
我认为java.util.concurrent.LinkedBlockingQueue
不管你有什么疑问,你都可以坚持下去。它是并发的。虽然,我不知道它的性能。可能, 的其他实现BlockingQueue
会更适合您。它们没有太多,所以进行性能测试和测量。
回答by jjnguy
Here is a list of classes implementing BlockingQueue
.
I would recommend checking out SynchronousQueue
.
我建议检查一下SynchronousQueue
。
Like @Rorick mentioned in his comment, I believe all of those implementations are concurrent. I think your concerns with LinkedBlockingQueue
may be out of place.
就像@Rorick 在他的评论中提到的那样,我相信所有这些实现都是并发的。我认为您的担忧LinkedBlockingQueue
可能是不合适的。
回答by codethulhu
I would suggest you look at ThreadPoolExecutornewSingleThreadExecutor. It will handle keeping your tasks ordered for you, and if you submit Callablesto your executor, you will be able to get the blocking behavior you are looking for as well.
我建议你看看ThreadPoolExecutornewSingleThreadExecutor。它将处理为您安排任务,如果您将Callables提交给您的执行程序,您也将能够获得您正在寻找的阻塞行为。
回答by Javamann
I use the ArrayBlockingQueue whenever I need to pass data from one thread to another. Using the put and take methods (which will block if full/empty).
每当我需要将数据从一个线程传递到另一个线程时,我都会使用 ArrayBlockingQueue。使用 put 和 take 方法(如果满/空将阻塞)。
回答by Vitaly
You can try LinkedTransferQueue from jsr166: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/
您可以从 jsr166 尝试 LinkedTransferQueue:http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/
It fulfills your requirements and have less overhead for offer/poll operations. As I can see from the code, when the queue is not empty, it uses atomic operations for polling elements. And when the queue is empty, it spins for some time and park the thread if unsuccessful. I think it can help in your case.
它满足您的要求,并且提供/投票操作的开销更少。从代码中可以看出,当队列不为空时,它使用原子操作来轮询元素。当队列为空时,它会旋转一段时间并在不成功时停放线程。我认为它可以帮助你的情况。
回答by ekangas
Similar to this answer https://stackoverflow.com/a/1212515/1102730but a bit different.. I ended up using an ExecutorService
. You can instantiate one by using Executors.newSingleThreadExecutor()
. I needed a concurrent queue for reading/writing BufferedImages to files, as well as atomicity with reads and writes. I only need a single thread because the file IO is orders of magnitude faster than the source, net IO. Also, I was more concerned about atomicity of actions and correctness than performance, but this approach can also be done with multiple threads in the pool to speed things up.
类似于这个答案https://stackoverflow.com/a/1212515/1102730但有点不同..我最终使用了ExecutorService
. 您可以使用Executors.newSingleThreadExecutor()
. 我需要一个并发队列来读取/写入 BufferedImages 到文件,以及读取和写入的原子性。我只需要一个线程,因为文件 IO 比源网络 IO 快几个数量级。此外,与性能相比,我更关心操作的原子性和正确性,但这种方法也可以通过池中的多个线程来完成,以加快速度。
To get an image (Try-Catch-Finally omitted):
获取图像(Try-Catch-Finally 省略):
Future<BufferedImage> futureImage = executorService.submit(new Callable<BufferedImage>() {
@Override
public BufferedImage call() throws Exception {
ImageInputStream is = new FileImageInputStream(file);
return ImageIO.read(is);
}
})
image = futureImage.get();
To save an image (Try-Catch-Finally omitted):
保存图像(Try-Catch-Finally 省略):
Future<Boolean> futureWrite = executorService.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
FileOutputStream os = new FileOutputStream(file);
return ImageIO.write(image, getFileFormat(), os);
}
});
boolean wasWritten = futureWrite.get();
It's important to note that you should flush and close your streams in a finally block. I don't know about how it performs compared to other solutions, but it is pretty versatile.
请务必注意,您应该在 finally 块中刷新和关闭您的流。我不知道与其他解决方案相比它的性能如何,但它非常通用。