Java 中的线程安全循环缓冲区
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/11079210/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Thread-safe circular buffer in Java
提问by Adam Matan
Consider a few web server instances running in parallel. Each server holds a reference to a single shared "Status keeper", whose role is keeping the last N
requests from all servers.
考虑几个并行运行的 Web 服务器实例。每个服务器都持有对单个共享“状态保持器”的引用,其作用是保存N
来自所有服务器的最后请求。
For example (N=3
):
例如(N=3
):
Server a: "Request id = ABCD" Status keeper=["ABCD"]
Server b: "Request id = XYZZ" Status keeper=["ABCD", "XYZZ"]
Server c: "Request id = 1234" Status keeper=["ABCD", "XYZZ", "1234"]
Server b: "Request id = FOO" Status keeper=["XYZZ", "1234", "FOO"]
Server a: "Request id = BAR" Status keeper=["1234", "FOO", "BAR"]
At any point in time, the "Status keeper" might be called from a monitoring application that reads these last N
requests for an SLA report.
在任何时间点,“状态保持器”可能会从读取这些最新N
请求以获取 SLA 报告的监控应用程序中调用。
What's the best way to implement this producer-consumer scenario in Java, giving the web servers higher priority than the SLA report?
在 Java 中实现这种生产者-消费者场景的最佳方法是什么,给予 Web 服务器比 SLA 报告更高的优先级?
CircularFifoBufferseems to be the appropriate data structure to hold the requests, but I'm not sure what's the optimal way to implement efficient concurrency.
CircularFifoBuffer似乎是保存请求的合适数据结构,但我不确定实现高效并发的最佳方法是什么。
采纳答案by MahdeTo
Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer());
回答by OldCurmudgeon
Here's a lock-free ring buffer implementation. It implements a fixed-size buffer - there is no FIFO functionality. I would suggest you store a Collection
of requests for each server instead. That way your report can do the filtering rather than getting your data structure to filter.
这是一个无锁环形缓冲区实现。它实现了一个固定大小的缓冲区——没有 FIFO 功能。我建议您Collection
为每个服务器存储一个请求。这样您的报告就可以进行过滤,而不是让您的数据结构进行过滤。
/**
* Container
* ---------
*
* A lock-free container that offers a close-to O(1) add/remove performance.
*
*/
public class Container<T> implements Iterable<T> {
// The capacity of the container.
final int capacity;
// The list.
AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
// TESTING {
AtomicLong totalAdded = new AtomicLong(0);
AtomicLong totalFreed = new AtomicLong(0);
AtomicLong totalSkipped = new AtomicLong(0);
private void resetStats() {
totalAdded.set(0);
totalFreed.set(0);
totalSkipped.set(0);
}
// TESTING }
// Constructor
public Container(int capacity) {
this.capacity = capacity;
// Construct the list.
Node<T> h = new Node<T>();
Node<T> it = h;
// One created, now add (capacity - 1) more
for (int i = 0; i < capacity - 1; i++) {
// Add it.
it.next = new Node<T>();
// Step on to it.
it = it.next;
}
// Make it a ring.
it.next = h;
// Install it.
head.set(h);
}
// Empty ... NOT thread safe.
public void clear() {
Node<T> it = head.get();
for (int i = 0; i < capacity; i++) {
// Trash the element
it.element = null;
// Mark it free.
it.free.set(true);
it = it.next;
}
// Clear stats.
resetStats();
}
// Add a new one.
public Node<T> add(T element) {
// Get a free node and attach the element.
totalAdded.incrementAndGet();
return getFree().attach(element);
}
// Find the next free element and mark it not free.
private Node<T> getFree() {
Node<T> freeNode = head.get();
int skipped = 0;
// Stop when we hit the end of the list
// ... or we successfully transit a node from free to not-free.
while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
skipped += 1;
freeNode = freeNode.next;
}
// Keep count of skipped.
totalSkipped.addAndGet(skipped);
if (skipped < capacity) {
// Put the head as next.
// Doesn't matter if it fails. That would just mean someone else was doing the same.
head.set(freeNode.next);
} else {
// We hit the end! No more free nodes.
throw new IllegalStateException("Capacity exhausted.");
}
return freeNode;
}
// Mark it free.
public void remove(Node<T> it, T element) {
totalFreed.incrementAndGet();
// Remove the element first.
it.detach(element);
// Mark it as free.
if (!it.free.compareAndSet(false, true)) {
throw new IllegalStateException("Freeing a freed node.");
}
}
// The Node class. It is static so needs the <T> repeated.
public static class Node<T> {
// The element in the node.
private T element;
// Are we free?
private AtomicBoolean free = new AtomicBoolean(true);
// The next reference in whatever list I am in.
private Node<T> next;
// Construct a node of the list
private Node() {
// Start empty.
element = null;
}
// Attach the element.
public Node<T> attach(T element) {
// Sanity check.
if (this.element == null) {
this.element = element;
} else {
throw new IllegalArgumentException("There is already an element attached.");
}
// Useful for chaining.
return this;
}
// Detach the element.
public Node<T> detach(T element) {
// Sanity check.
if (this.element == element) {
this.element = null;
} else {
throw new IllegalArgumentException("Removal of wrong element.");
}
// Useful for chaining.
return this;
}
public T get () {
return element;
}
@Override
public String toString() {
return element != null ? element.toString() : "null";
}
}
// Provides an iterator across all items in the container.
public Iterator<T> iterator() {
return new UsedNodesIterator<T>(this);
}
// Iterates across used nodes.
private static class UsedNodesIterator<T> implements Iterator<T> {
// Where next to look for the next used node.
Node<T> it;
int limit = 0;
T next = null;
public UsedNodesIterator(Container<T> c) {
// Snapshot the head node at this time.
it = c.head.get();
limit = c.capacity;
}
public boolean hasNext() {
// Made into a `while` loop to fix issue reported by @Nim in code review
while (next == null && limit > 0) {
// Scan to the next non-free node.
while (limit > 0 && it.free.get() == true) {
it = it.next;
// Step down 1.
limit -= 1;
}
if (limit != 0) {
next = it.element;
}
}
return next != null;
}
public T next() {
T n = null;
if ( hasNext () ) {
// Give it to them.
n = next;
next = null;
// Step forward.
it = it.next;
limit -= 1;
} else {
// Not there!!
throw new NoSuchElementException ();
}
return n;
}
public void remove() {
throw new UnsupportedOperationException("Not supported.");
}
}
@Override
public String toString() {
StringBuilder s = new StringBuilder();
Separator comma = new Separator(",");
// Keep counts too.
int usedCount = 0;
int freeCount = 0;
// I will iterate the list myself as I want to count free nodes too.
Node<T> it = head.get();
int count = 0;
s.append("[");
// Scan to the end.
while (count < capacity) {
// Is it in-use?
if (it.free.get() == false) {
// Grab its element.
T e = it.element;
// Is it null?
if (e != null) {
// Good element.
s.append(comma.sep()).append(e.toString());
// Count them.
usedCount += 1;
} else {
// Probably became free while I was traversing.
// Because the element is detached before the entry is marked free.
freeCount += 1;
}
} else {
// Free one.
freeCount += 1;
}
// Next
it = it.next;
count += 1;
}
// Decorate with counts "]used+free".
s.append("]").append(usedCount).append("+").append(freeCount);
if (usedCount + freeCount != capacity) {
// Perhaps something was added/freed while we were iterating.
s.append("?");
}
return s.toString();
}
}
Note that this is close to O1 put and get. A Separator
just emits "" first time around and then its parameter from then on.
请注意,这接近于 O1 put and get。ASeparator
只是第一次发出“”,然后是它的参数。
Edit: Added test methods.
编辑:添加了测试方法。
// ***** Following only needed for testing. *****
private static boolean Debug = false;
private final static String logName = "Container.log";
private final static NamedFileOutput log = new NamedFileOutput("C:\Junk\");
private static synchronized void log(boolean toStdoutToo, String s) {
if (Debug) {
if (toStdoutToo) {
System.out.println(s);
}
log(s);
}
}
private static synchronized void log(String s) {
if (Debug) {
try {
log.writeLn(logName, s);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
static volatile boolean testing = true;
// Tester object to exercise the container.
static class Tester<T> implements Runnable {
// My name.
T me;
// The container I am testing.
Container<T> c;
public Tester(Container<T> container, T name) {
c = container;
me = name;
}
private void pause() {
try {
Thread.sleep(0);
} catch (InterruptedException ex) {
testing = false;
}
}
public void run() {
// Spin on add/remove until stopped.
while (testing) {
// Add it.
Node<T> n = c.add(me);
log("Added " + me + ": " + c.toString());
pause();
// Remove it.
c.remove(n, me);
log("Removed " + me + ": " + c.toString());
pause();
}
}
}
static final String[] strings = {
"One", "Two", "Three", "Four", "Five",
"Six", "Seven", "Eight", "Nine", "Ten"
};
static final int TEST_THREADS = Math.min(10, strings.length);
public static void main(String[] args) throws InterruptedException {
Debug = true;
log.delete(logName);
Container<String> c = new Container<String>(10);
// Simple add/remove
log(true, "Simple test");
Node<String> it = c.add(strings[0]);
log("Added " + c.toString());
c.remove(it, strings[0]);
log("Removed " + c.toString());
// Capacity test.
log(true, "Capacity test");
ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length);
// Fill it.
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
log("Added " + strings[i] + " " + c.toString());
}
// Add one more.
try {
c.add("Wafer thin mint!");
} catch (IllegalStateException ise) {
log("Full!");
}
c.clear();
log("Empty: " + c.toString());
// Iterate test.
log(true, "Iterator test");
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
}
StringBuilder all = new StringBuilder ();
Separator sep = new Separator(",");
for (String s : c) {
all.append(sep.sep()).append(s);
}
log("All: "+all);
for (int i = 0; i < strings.length; i++) {
c.remove(nodes.get(i), strings[i]);
}
sep.reset();
all.setLength(0);
for (String s : c) {
all.append(sep.sep()).append(s);
}
log("None: " + all.toString());
// Multiple add/remove
log(true, "Multi test");
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
log("Added " + strings[i] + " " + c.toString());
}
log("Filled " + c.toString());
for (int i = 0; i < strings.length - 1; i++) {
c.remove(nodes.get(i), strings[i]);
log("Removed " + strings[i] + " " + c.toString());
}
c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]);
log("Empty " + c.toString());
// Multi-threaded add/remove
log(true, "Threads test");
c.clear();
for (int i = 0; i < TEST_THREADS; i++) {
Thread t = new Thread(new Tester<String>(c, strings[i]));
t.setName("Tester " + strings[i]);
log("Starting " + t.getName());
t.start();
}
// Wait for 10 seconds.
long stop = System.currentTimeMillis() + 10 * 1000;
while (System.currentTimeMillis() < stop) {
Thread.sleep(100);
}
// Stop the testers.
testing = false;
// Wait some more.
Thread.sleep(1 * 100);
// Get stats.
double added = c.totalAdded.doubleValue();
double skipped = c.totalSkipped.doubleValue();
//double freed = c.freed.doubleValue();
log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")");
}
回答by ChrLipp
Maybe you want to look at Disruptor - Concurrent Programming Framework.
也许你想看看Disruptor-Concurrent Programming Framework。
- Find a paper describing the alternatives, design and also a performance comparement to
java.util.concurrent.ArrayBlockingQueue
here: pdf - Consider to read the first three articles from BlogsAndArticles
- 找到一篇描述替代方案、设计以及与
java.util.concurrent.ArrayBlockingQueue
此处的性能比较的论文:pdf - 考虑阅读来自BlogsAndArticles的前三篇文章
If the library is too much, stick to java.util.concurrent.ArrayBlockingQueue
如果库太多,坚持 java.util.concurrent.ArrayBlockingQueue
回答by Peter Lawrey
I would have a look at ArrayDeque, or for a more concurrent implementation have a look at the Disruptorlibrary which is one of the most sophisticated/complex ring buffer in Java.
我会看一下 ArrayDeque,或者要查看更多并发实现,请查看Disruptor库,它是 Java 中最复杂/最复杂的环形缓冲区之一。
An alternative is to use an unbounded queue which is more concurrent as the producer never needs to wait for the consumer. Java Chronicle
另一种方法是使用一个无界队列,它的并发性更高,因为生产者永远不需要等待消费者。Java编年史
Unless your needs justify the complexity, an ArrayDeque may be all you need.
除非您的需要证明复杂性是合理的,否则 ArrayDeque 可能就是您所需要的。
回答by Gene
Also have a look at java.util.concurrent
.
也看看java.util.concurrent
。
Blocking queues will block until there is something to consume or (optionally) space to produce:
阻塞队列将阻塞,直到有东西消耗或(可选)产生空间:
http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
Concurrent linked queue is non-blocking and uses a slick algorithm that allows a producer and consumer to be active concurrently:
并发链接队列是非阻塞的,并使用一种灵活的算法,允许生产者和消费者同时处于活动状态:
http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html
http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html
回答by mindas
回答by Sam Goldberg
If it were me, I would use the CircularFIFOBuffer as you indicated, and synchronize around the buffer when writing (add). When the monitoring application wants to read the buffer, synchronize on the buffer, and then copy or clone it to use for reporting.
如果是我,我会按照您的指示使用 CircularFIFOBuffer,并在写入(添加)时在缓冲区周围同步。当监控应用程序要读取缓冲区时,在缓冲区上进行同步,然后复制或克隆它以用于报告。
This suggestion is predicated on the assumption that latency is minimal to copy/clone the buffer to a new object. If there are large number of elements, and copying time is slow, then this is not a good idea.
此建议基于以下假设:将缓冲区复制/克隆到新对象的延迟最小。如果元素数量很多,复制时间很慢,那么这不是一个好主意。
Pseudo-Code example:
伪代码示例:
public void writeRequest(String requestID) {
synchronized(buffer) {
buffer.add(requestID);
}
}
public Collection<String> getRequests() {
synchronized(buffer) {
return buffer.clone();
}
}