java 并发集队列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/3120495/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Concurrent Set Queue
提问by Steve Skrla
Maybe this is a silly question, but I cannot seem to find an obvious answer.
也许这是一个愚蠢的问题,但我似乎找不到明显的答案。
I need a concurrent FIFO queue that contains only unique values. Attempting to add a value that already exists in the queue simply ignores that value. Which, if not for the thread safety would be trivial. Is there a data structure in Java or maybe a code snipit on the interwebs that exhibits this behavior?
我需要一个仅包含唯一值的并发 FIFO 队列。尝试添加队列中已存在的值只会忽略该值。其中,如果不是为了线程安全,那将是微不足道的。是否有 Java 中的数据结构或互联网上的代码片段表现出这种行为?
采纳答案by Kevin Bourrillion
If you want better concurrency than full synchronization, there is one way I know of to do it, using a ConcurrentHashMap as the backing map. The following is a sketch only.
如果您想要比完全同步更好的并发性,我知道有一种方法可以做到,使用 ConcurrentHashMap 作为支持映射。以下仅为草图。
public final class ConcurrentHashSet<E> extends ForwardingSet<E>
implements Set<E>, Queue<E> {
private enum Dummy { VALUE }
private final ConcurrentMap<E, Dummy> map;
ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
super(map.keySet());
this.map = Preconditions.checkNotNull(map);
}
@Override public boolean add(E element) {
return map.put(element, Dummy.VALUE) == null;
}
@Override public boolean addAll(Collection<? extends E> newElements) {
// just the standard implementation
boolean modified = false;
for (E element : newElements) {
modified |= add(element);
}
return modified;
}
@Override public boolean offer(E element) {
return add(element);
}
@Override public E remove() {
E polled = poll();
if (polled == null) {
throw new NoSuchElementException();
}
return polled;
}
@Override public E poll() {
for (E element : this) {
// Not convinced that removing via iterator is viable (check this?)
if (map.remove(element) != null) {
return element;
}
}
return null;
}
@Override public E element() {
return iterator().next();
}
@Override public E peek() {
Iterator<E> iterator = iterator();
return iterator.hasNext() ? iterator.next() : null;
}
}
All is not sunshine with this approach. We have no decent way to select a head element other than using the backing map's entrySet().iterator().next(), the result being that the map gets more and more unbalanced as time goes on. This unbalancing is a problem both due to greater bucket collisions and greater segment contention.
这种方法并非一帆风顺。除了使用支持映射的 之外entrySet().iterator().next(),我们没有其他合适的方法来选择头部元素,结果是随着时间的推移,映射变得越来越不平衡。由于更大的桶冲突和更大的段争用,这种不平衡是一个问题。
Note: this code uses Guavain a few places.
注意:此代码在几个地方使用了Guava。
回答by Ben Manes
I would use a synchronized LinkedHashSet until there was enough justification to consider alternatives. The primary benefit that a more concurrent solution could offer is lock splitting.
我会使用同步的 LinkedHashSet 直到有足够的理由来考虑替代方案。更并发的解决方案可以提供的主要好处是锁分裂。
The simplest concurrent approach would be a a ConcurrentHashMap (acting as a set) and a ConcurrentLinkedQueue. The ordering of operations would provide the desired constraint. An offer() would first perform a CHM#putIfAbsent() and if successful insert into the CLQ. A poll() would take from the CLQ and then remove it from the CHM. This means that we consider an entry in our queue if it is in the map and the CLQ provides the ordering. The performance could then be adjusted by increasing the map's concurrencyLevel. If you are tolerant to additional racy-ness, then a cheap CHM#get() could act as a reasonable precondition (but it can suffer by being a slightly stale view).
最简单的并发方法是一个 ConcurrentHashMap(作为一个集合)和一个 ConcurrentLinkedQueue。操作的顺序将提供所需的约束。offer() 将首先执行 CHM#putIfAbsent(),如果成功插入到 CLQ。poll() 将从 CLQ 中获取,然后将其从 CHM 中删除。这意味着我们考虑队列中的条目,如果它在映射中并且 CLQ 提供排序。然后可以通过增加地图的 concurrencyLevel 来调整性能。如果你能容忍额外的 racy-ness,那么便宜的 CHM#get() 可以作为一个合理的先决条件(但它可能会因为有点陈旧的观点而受到影响)。
回答by erickson
There's not a built-in collection that does this. There are some concurrent Setimplementations that could be used together with a concurrent Queue.
没有一个内置的集合可以做到这一点。有一些并发Set实现可以与 concurrent 一起使用Queue。
For example, an item is added to the queue only after it was successfully added to the set, and each item removed from the queue is removed from the set. In this case, the contents of the queue, logically, are really whatever is in the set, and the queue is just used to track the order and provide efficient take()and poll()operations found only on a BlockingQueue.
例如,一个项目只有在成功添加到集合后才被添加到队列中,从队列中删除的每个项目都从集合中删除。在这种情况下,队列中的内容,从逻辑上讲,真的是无论是在集,队列只是用来跟踪订单和提供高效take()且poll()只发现一个操作BlockingQueue。
回答by Gilbert Le Blanc
A java.util.concurrent.ConcurrentLinkedQueuegets you most of the way there.
一个java.util.concurrent.ConcurrentLinkedQueue中的存在方式最让你。
Wrap the ConcurrentLinkedQueue with your own class that checks for the uniqueness of an add. Your code has to be thread safe.
用您自己的类包装 ConcurrentLinkedQueue 以检查添加的唯一性。您的代码必须是线程安全的。
回答by Jed Wesley-Smith
What do you mean by a concurrent queue with Set semantics? If you mean a truly concurrent structure (as opposed to a thread-safe structure) then I would contend that you are asking for a pony.
具有 Set 语义的并发队列是什么意思?如果您的意思是真正的并发结构(而不是线程安全结构),那么我会争辩说您要的是小马。
What happens for instance if you call put(element)and detect that something is already there which immediately is removed? For instance, what does it mean in your case if offer(element) || queue.contains(element)returns false?
例如,如果您调用put(element)并检测到某些东西已经存在并立即被删除,会发生什么?例如,如果offer(element) || queue.contains(element)返回在您的情况下是什么意思false?
These kinds of things often need to thought about slightly differently in a concurrent world as often nothing is as it seems unless you stop the world (lock it down). Otherwise you are usually looking at something in the past. So, what are you actually trying to do?
在并发世界中,这些类型的事情通常需要稍微不同地考虑,因为除非您停止世界(锁定它),否则通常什么都不是。否则,您通常会看到过去的某些东西。那么,你究竟想做什么?
回答by Chris
Perhaps extend ArrayBlockingQueue. In order to get access to the (package-access) lock, I had to put my sub-class within the same package. Caveat: I haven't tested this.
也许扩展ArrayBlockingQueue。为了访问(包访问)锁,我必须将我的子类放在同一个包中。警告:我还没有测试过这个。
package java.util.concurrent;
import java.util.Collection;
import java.util.concurrent.locks.ReentrantLock;
public class DeDupingBlockingQueue<E> extends ArrayBlockingQueue<E> {
public DeDupingBlockingQueue(int capacity) {
super(capacity);
}
public DeDupingBlockingQueue(int capacity, boolean fair) {
super(capacity, fair);
}
public DeDupingBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
super(capacity, fair, c);
}
@Override
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (contains(e)) return false;
return super.add(e);
} finally {
lock.unlock();
}
}
@Override
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (contains(e)) return true;
return super.offer(e);
} finally {
lock.unlock();
}
}
@Override
public void put(E e) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //Should this be lock.lock() instead?
try {
if (contains(e)) return;
super.put(e); //if it blocks, it does so without holding the lock.
} finally {
lock.unlock();
}
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (contains(e)) return true;
return super.offer(e, timeout, unit); //if it blocks, it does so without holding the lock.
} finally {
lock.unlock();
}
}
}
回答by omid gholami
A simple answer for a queue of unique objects can be as follow:
唯一对象队列的简单答案如下:
import java.util.concurrent.ConcurrentLinkedQueue;
public class FinalQueue {
class Bin {
private int a;
private int b;
public Bin(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public int hashCode() {
return a * b;
}
public String toString() {
return a + ":" + b;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Bin other = (Bin) obj;
if ((a != other.a) || (b != other.b))
return false;
return true;
}
}
private ConcurrentLinkedQueue<Bin> queue;
public FinalQueue() {
queue = new ConcurrentLinkedQueue<Bin>();
}
public synchronized void enqueue(Bin ipAddress) {
if (!queue.contains(ipAddress))
queue.add(ipAddress);
}
public Bin dequeue() {
return queue.poll();
}
public String toString() {
return "" + queue;
}
/**
* @param args
*/
public static void main(String[] args) {
FinalQueue queue = new FinalQueue();
Bin a = queue.new Bin(2,6);
queue.enqueue(a);
queue.enqueue(queue.new Bin(13, 3));
queue.enqueue(queue.new Bin(13, 3));
queue.enqueue(queue.new Bin(14, 3));
queue.enqueue(queue.new Bin(13, 9));
queue.enqueue(queue.new Bin(18, 3));
queue.enqueue(queue.new Bin(14, 7));
Bin x= queue.dequeue();
System.out.println(x.a);
System.out.println(queue.toString());
System.out.println("Dequeue..." + queue.dequeue());
System.out.println("Dequeue..." + queue.dequeue());
System.out.println(queue.toString());
}
}

