Java环形缓冲区
环形缓冲区是用作队列的数组。环形缓冲区具有读取位置和写入位置,该位置标记了要从环形缓冲区读取和写入的下一个位置。当写入位置到达数组的末尾时,写入位置将重新设置为0。读取位置也是如此。
将读取和写入位置到达数组的末尾时将其设置回零,这有时也称为"环绕"。正是这种行为将数组变成了环形缓冲区。当读写位置到达数组的末尾时,它们将从数组的开头继续,就像数组是一个环一样。因此,命名环缓冲区。
本环形缓冲区教程将解释环形缓冲区的工作原理,并展示两个Java环形缓冲区的实现。
环形缓冲区如何工作
环形缓冲区是一个大小固定的数组,就像有界队列一样。该数组包含存储在环形缓冲区中的元素。
除了包含元素的数组之外,环形缓冲区还包含一个写入位置,该位置指向数组中要插入下一个元素的位置。
环形缓冲区还需要跟踪读取位置,即数组中的下一个位置以从中读取元素。
环形缓冲区还需要跟踪元素数组中的可用空间。当写位置未缠绕时,已用空间在写位置和读位置之间。其余的都是自由空间。下图说明了这种情况下的环形缓冲区:
当写位置已缠绕时,情况就不同了。突然之间,自由空间就是写位置和读位置之间的空间。其余全部为已用空间。这是说明写位置缠绕后的同一个环形缓冲区的图:
有多种方法可以跟踪环形缓冲区中的读取位置(以及因此使用的空间和可用空间)。在本教程的稍后部分,我将解释两种在Java中实现环形缓冲区的不同方法。
环形缓冲区速度很快
环形缓冲区是实现类队列结构的快速方法。 "快速"是指它们都相当容易实现,并且它们的性能都很好。
环形缓冲区用例
环形缓冲区既可用于实际队列(如消息队列),也可用于生成以后以类似流的方式消费的数据。
当我们需要硬缓冲区(环形缓冲区中(队列中)可以有多少个数据)的上限时,环形缓冲区特别有用。
如果我们不想在队列结构上设置上限,则可能应该使用链表或者可以调整自身大小的环形缓冲区(在缓冲区满时分配一个更大的新数组,然后将所有元素移至该数组)。
在本教程中,我将只关注有界环形缓冲区的实现。
环形缓冲区实现技术
有很多方法可以实现环形缓冲区。在此环形缓冲区教程中,我将向我们展示实现环形缓冲区的两种最简单的技术。这些技术是:
- 使用填充计数
- 使用翻转标记
我将在以下各节中介绍这两种技术。
使用填充计数的环形缓冲区
跟踪环形缓冲区中的写入位置,读取位置和元素数量的一种方法是使用写入位置和填充计数。写入位置标记环形缓冲区中要向其写入元素的下一个位置。填充计数表明缓冲区中当前存储了多少个元素。
将元素写入环形缓冲区时,只需检查填充计数以查看其是否已满。如果未满,则可以将元素插入写入位置,并将写入位置前进到下一个空闲插槽。
同样,从环形缓冲区读取元素时,只需检查填充计数以查看环形缓冲区是否为空。通过从写入位置减去填充计数来计算读取位置。如果该计算结果为负,则写位置已回绕,并且需要将缓冲区(数组)的大小添加到读位置,以获取正确的读位置。
以下是使用填充计数的环形缓冲区实现。请注意,它不会显式跟踪读取位置,而是根据写入位置和填充计数来计算读取位置。还要注意,填充计数字段称为"可用"(不是" fillCount")。
public class RingBufferFillCount {
public Object[] elements = null;
private int capacity = 0;
private int writePos = 0;
private int available = 0;
public RingBufferFillCount(int capacity) {
this.capacity = capacity;
this.elements = new Object[capacity];
}
public void reset() {
this.writePos = 0;
this.available = 0;
}
public int capacity() { return this.capacity; }
public int available(){ return this.available; }
public int remainingCapacity() {
return this.capacity - this.available;
}
public boolean put(Object element){
if(available < capacity){
if(writePos >= capacity){
writePos = 0;
}
elements[writePos] = element;
writePos++;
available++;
return true;
}
return false;
}
public Object take() {
if(available == 0){
return null;
}
int nextSlot = writePos - available;
if(nextSlot < 0){
nextSlot += capacity;
}
Object nextObj = elements[nextSlot];
available--;
return nextObj;
}
}
使用翻转标记的环形缓冲区
跟踪读取位置,写入位置以及缓冲区中有多少个元素的另一种选择是,除了写入位置之外,还可以简单地保持读取位置。
根据写入和读取位置计算缓冲区中有多少个元素。计算的外观取决于写入位置是否翻转(环绕)。
如果写位置没有缠绕,则可以简单地从写位置中减去读位置,以了解缓冲区中有多少个元素。如果写位置已缠绕(翻转),则可用空间等于容量减去读位置再加上写位置。
为了跟踪写入位置是否已翻转,使用了特殊的"翻转标记"。这就是实现名称的来源。实际上,在大多数情况下,我们只需检查写入位置是否大于或者小于读取位置即可检测写入位置是否缠绕。但是,当写入位置和读取位置相等时(环形缓冲区完全满或者完全空),这不起作用。
以下是使用翻转标记和读取位置的环形缓冲区的实现。
public class RingBufferFlipMarker {
public Object[] elements = null;
public int capacity = 0;
public int writePos = 0;
public int readPos = 0;
public boolean flipped = false; //the flip marker
public RingBufferFlipMarker(int capacity) {
this.capacity = capacity;
this.elements = new Object[capacity];
}
public void reset() {
this.writePos = 0;
this.readPos = 0;
this.flipped = false;
}
public int available() {
if(!flipped){
return writePos - readPos;
}
return capacity - readPos + writePos;
}
public int remainingCapacity() {
if(!flipped){
return capacity - writePos;
}
return readPos - writePos;
}
public boolean put(Object element){
if(!flipped){
if(writePos == capacity){
writePos = 0;
flipped = true;
if(writePos < readPos){
elements[writePos++] = element;
return true;
} else {
return false;
}
} else {
elements[writePos++] = element;
return true;
}
} else {
if(writePos < readPos ){
elements[writePos++] = element;
return true;
} else {
return false;
}
}
}
public Object take() {
if(!flipped){
if(readPos < writePos){
return elements[readPos++];
} else {
return null;
}
} else {
if(readPos == capacity){
readPos = 0;
flipped = false;
if(readPos < writePos){
return elements[readPos++];
} else {
return null;
}
} else {
return elements[readPos++];
}
}
}
}
环形缓冲区性能
我的基准测试表明,使用填充计数的环形缓冲区要比使用翻转标记的环形缓冲区快一点。但是差异是如此之小,以至于几乎是微不足道的。
批处理模式
我已经为两个环形缓冲区实现实现了批处理方式的put()和take()操作。批处理操作的实现在本教程的后面列出。一次放置和获取多个元素比一次放置和获取单个元素快得多。
我的基准测试表明,批量put()和take()操作一次提供最多4倍的放入和取出单个元素的吞吐量。确切地,多少取决于批量大小。较大的批次比较小的批次产生更高的吞吐量,因为在紧密的阵列复制循环中花费了更多时间。
使用读取位置+翻转标记的环形缓冲区实现的批处理操作比使用填充计数的环形缓冲区的批处理操作快约15%。
并发
上面显示的两个实现都不是线程安全的。它们只能在同一线程中使用。
我的印象是,对于单个读取器,单个写入器的情况,使用读取位置和翻转标记的实现更容易在线程安全版本中实现。单个读取器,单个写入器的情况意味着只有一个线程将元素放入环形缓冲区。我的意思是,只有一个线程。不在同一时间。只有那个相同的线程。编写线程也遵循相同的原则。只有一个线程和同一线程才写入环形缓冲区。但是,读取线程不必与写入线程相同。
我尚未实现环形缓冲区的任何单个读取器,单个写入器版本,因此我实际上并不知道。如果有一天我会更新本教程。
使用填充计数的环形缓冲区-包括批处理操作
这是环形缓冲区的实现,该环形缓冲区使用填充计数,其中包括批处理put()和take()操作。
public class RingBufferFillCount {
public Object[] elements = null;
public int capacity = 0;
public int writePos = 0;
public int available = 0;
public RingBufferFillCount(int capacity) {
this.capacity = capacity;
this.elements = new Object[capacity];
}
public void reset() {
this.writePos = 0;
this.available = 0;
}
public int remainingCapacity() {
return this.capacity - this.available;
}
public boolean put(Object element){
if(available < capacity){
if(writePos >= capacity){
writePos = 0;
}
elements[writePos] = element;
writePos++;
available++;
return true;
}
return false;
}
public int put(Object[] newElements){
return put(newElements, newElements.length);
}
public int put(Object[] newElements, int length){
int readPos = 0;
if(this.writePos > this.available){
//space above writePos is all empty
if(length <= this.capacity - this.writePos){
//space above writePos is sufficient to insert batch
for(; readPos < length; readPos++){
this.elements[this.writePos++] = newElements[readPos];
}
this.available += readPos;
return length;
} else {
//both space above writePos and below writePos is necessary to use
//to insert batch.
int lastEmptyPos = writePos - available;
for(; this.writePos < this.capacity; this.writePos++){
this.elements[this.writePos] = newElements[readPos++];
}
//fill into bottom of array too.
this.writePos = 0;
int endPos = Math.min(length - readPos, capacity - available - readPos);
for(;this.writePos < endPos; this.writePos++){
this.elements[this.writePos] = newElements[readPos++];
}
this.available += readPos;
return readPos;
}
} else {
int endPos = this.capacity - this.available + this.writePos;
for(; this.writePos < endPos; this.writePos++){
this.elements[this.writePos] = newElements[readPos++];
}
this.available += readPos;
return readPos;
}
}
public Object take() {
if(available == 0){
return null;
}
int nextSlot = writePos - available;
if(nextSlot < 0){
nextSlot += capacity;
}
Object nextObj = elements[nextSlot];
available--;
return nextObj;
}
public int take(Object[] into){
return take(into, into.length);
}
public int take(Object[] into, int length){
int intoPos = 0;
if(available <= writePos){
int nextPos= writePos - available;
int endPos = nextPos + Math.min(available, length);
for(;nextPos < endPos; nextPos++){
into[intoPos++] = this.elements[nextPos];
}
this.available -= intoPos;
return intoPos;
} else {
int nextPos = writePos - available + capacity;
int leftInTop = capacity - nextPos;
if(length <= leftInTop){
//copy directly
for(; intoPos < length; intoPos++){
into[intoPos] = this.elements[nextPos++];
}
this.available -= length;
return length;
} else {
//copy top
for(; nextPos < capacity; nextPos++){
into[intoPos++] = this.elements[nextPos];
}
//copy bottom - from 0 to writePos
nextPos = 0;
int leftToCopy = length - intoPos;
int endPos = Math.min(writePos, leftToCopy);
for(;nextPos < endPos; nextPos++){
into[intoPos++] = this.elements[nextPos];
}
this.available -= intoPos;
return intoPos;
}
}
}
}
使用翻转标记的环形缓冲区-包括批处理操作
这是使用读取位置和翻转标记的环形缓冲区的实现,其中包括批处理" put()"和" take()"操作。
public class RingBufferFlip {
public Object[] elements = null;
public int capacity = 0;
public int writePos = 0;
public int readPos = 0;
public boolean flipped = false;
public RingBufferFlip(int capacity) {
this.capacity = capacity;
this.elements = new Object[capacity];
}
public void reset() {
this.writePos = 0;
this.readPos = 0;
this.flipped = false;
}
public int available() {
if(!flipped){
return writePos - readPos;
}
return capacity - readPos + writePos;
}
public int remainingCapacity() {
if(!flipped){
return capacity - writePos;
}
return readPos - writePos;
}
public boolean put(Object element){
if(!flipped){
if(writePos == capacity){
writePos = 0;
flipped = true;
if(writePos < readPos){
elements[writePos++] = element;
return true;
} else {
return false;
}
} else {
elements[writePos++] = element;
return true;
}
} else {
if(writePos < readPos ){
elements[writePos++] = element;
return true;
} else {
return false;
}
}
}
public int put(Object[] newElements, int length){
int newElementsReadPos = 0;
if(!flipped){
//readPos lower than writePos - free sections are:
//1) from writePos to capacity
//2) from 0 to readPos
if(length <= capacity - writePos){
//new elements fit into top of elements array - copy directly
for(; newElementsReadPos < length; newElementsReadPos++){
this.elements[this.writePos++] = newElements[newElementsReadPos];
}
return newElementsReadPos;
} else {
//new elements must be divided between top and bottom of elements array
//writing to top
for(;this.writePos < capacity; this.writePos++){
this.elements[this.writePos] = newElements[newElementsReadPos++];
}
//writing to bottom
this.writePos = 0;
this.flipped = true;
int endPos = Math.min(this.readPos, length - newElementsReadPos);
for(; this.writePos < endPos; this.writePos++){
this.elements[writePos] = newElements[newElementsReadPos++];
}
return newElementsReadPos;
}
} else {
//readPos higher than writePos - free sections are:
//1) from writePos to readPos
int endPos = Math.min(this.readPos, this.writePos + length);
for(; this.writePos < endPos; this.writePos++){
this.elements[this.writePos] = newElements[newElementsReadPos++];
}
return newElementsReadPos;
}
}
public Object take() {
if(!flipped){
if(readPos < writePos){
return elements[readPos++];
} else {
return null;
}
} else {
if(readPos == capacity){
readPos = 0;
flipped = false;
if(readPos < writePos){
return elements[readPos++];
} else {
return null;
}
} else {
return elements[readPos++];
}
}
}
public int take(Object[] into, int length){
int intoWritePos = 0;
if(!flipped){
//writePos higher than readPos - available section is writePos - readPos
int endPos = Math.min(this.writePos, this.readPos + length);
for(; this.readPos < endPos; this.readPos++){
into[intoWritePos++] = this.elements[this.readPos];
}
return intoWritePos;
} else {
//readPos higher than writePos - available sections are
//top + bottom of elements array
if(length <= capacity - readPos){
//length is lower than the elements available at the top
//of the elements array - copy directly
for(; intoWritePos < length; intoWritePos++){
into[intoWritePos] = this.elements[this.readPos++];
}
return intoWritePos;
} else {
//length is higher than elements available at the top of the elements array
//split copy into a copy from both top and bottom of elements array.
//copy from top
for(; this.readPos < capacity; this.readPos++){
into[intoWritePos++] = this.elements[this.readPos];
}
//copy from bottom
this.readPos = 0;
this.flipped = false;
int endPos = Math.min(this.writePos, length - intoWritePos);
for(; this.readPos < endPos; this.readPos++){
into[intoWritePos++] = this.elements[this.readPos];
}
return intoWritePos;
}
}
}
}

