Python 高效的循环缓冲区?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/4151320/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
efficient circular buffer?
提问by jedierikb
I want to create an efficient circular bufferin python (with the goal of taking averages of the integer values in the buffer).
我想在 python 中创建一个高效的循环缓冲区(目的是取缓冲区中整数值的平均值)。
Is this an efficient way to use a list to collect values?
这是使用列表收集值的有效方法吗?
def add_to_buffer( self, num ):
self.mylist.pop( 0 )
self.mylist.append( num )
What would be more efficient (and why)?
什么会更有效(为什么)?
采纳答案by aaronasterling
I would use collections.dequewith a maxlenarg
我会collections.deque与maxlenarg一起使用
>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
... d.append(i)
...
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)
There is a recipein the docs for dequethat is similar to what you want. My assertion that it's the most efficient rests entirely on the fact that it's implemented in C by an incredibly skilled crew that is in the habit of cranking out top notch code.
文档中有一个与您想要的类似的配方deque。我断言它是最有效的完全基于这样一个事实,即它是由一个非常熟练的团队用 C 实现的,他们习惯于编写一流的代码。
回答by John La Rooy
popping from the head of a list causes the whole list to be copied, so is inefficient
从列表的头部弹出会导致整个列表被复制,因此效率低下
You should instead use a list/array of fixed size and an index which moves through the buffer as you add/remove items
您应该改为使用固定大小的列表/数组和在添加/删除项目时在缓冲区中移动的索引
回答by SmartElectron
ok with the use of deque class, but for the requeriments of the question (average) this is my solution:
可以使用 deque 类,但是对于问题(平均)的要求,这是我的解决方案:
>>> from collections import deque
>>> class CircularBuffer(deque):
... def __init__(self, size=0):
... super(CircularBuffer, self).__init__(maxlen=size)
... @property
... def average(self): # TODO: Make type check for integer or floats
... return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
... cb.append(i)
... print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
回答by scls
You can also see this quite old Python recipe.
您还可以看到这个相当古老的Python 食谱。
Here is my own version with NumPy array:
这是我自己的 NumPy 数组版本:
#!/usr/bin/env python
import numpy as np
class RingBuffer(object):
def __init__(self, size_max, default_value=0.0, dtype=float):
"""initialization"""
self.size_max = size_max
self._data = np.empty(size_max, dtype=dtype)
self._data.fill(default_value)
self.size = 0
def append(self, value):
"""append an element"""
self._data = np.roll(self._data, 1)
self._data[0] = value
self.size += 1
if self.size == self.size_max:
self.__class__ = RingBufferFull
def get_all(self):
"""return a list of elements from the oldest to the newest"""
return(self._data)
def get_partial(self):
return(self.get_all()[0:self.size])
def __getitem__(self, key):
"""get element"""
return(self._data[key])
def __repr__(self):
"""return string representation"""
s = self._data.__repr__()
s = s + '\t' + str(self.size)
s = s + '\t' + self.get_all()[::-1].__repr__()
s = s + '\t' + self.get_partial()[::-1].__repr__()
return(s)
class RingBufferFull(RingBuffer):
def append(self, value):
"""append an element when buffer is full"""
self._data = np.roll(self._data, 1)
self._data[0] = value
回答by MoonCactus
This one does not require any library. It grows a list and then cycle within by index.
这个不需要任何库。它增长一个列表,然后按索引循环。
The footprint is very small (no library), and it runs twice as fast as dequeue at least. This is good to compute moving averages indeed, but be aware that the items are not kept sorted by age as above.
占用空间非常小(没有库),它的运行速度至少是 dequeue 的两倍。这确实有助于计算移动平均线,但请注意,项目并未按年龄排序,如上所示。
class CircularBuffer(object):
def __init__(self, size):
"""initialization"""
self.index= 0
self.size= size
self._data = []
def record(self, value):
"""append an element"""
if len(self._data) == self.size:
self._data[self.index]= value
else:
self._data.append(value)
self.index= (self.index + 1) % self.size
def __getitem__(self, key):
"""get element by index like a regular array"""
return(self._data[key])
def __repr__(self):
"""return string representation"""
return self._data.__repr__() + ' (' + str(len(self._data))+' items)'
def get_all(self):
"""return a list of all the elements"""
return(self._data)
To get the average value, e.g.:
要获得平均值,例如:
q= CircularBuffer(1000000);
for i in range(40000):
q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())
Results in:
结果是:
capacity= 1000000
stored= 40000
average= 19999
real 0m0.024s
user 0m0.020s
sys 0m0.000s
This is about 1/3 the time of the equivalent with dequeue.
这大约是出队时间的 1/3。
回答by Orvar Korvar
Python's deque is slow. You can also use numpy.roll instead How do you rotate the numbers in an numpy array of shape (n,) or (n,1)?
Python 的双端队列很慢。您也可以改用 numpy.roll 如何旋转形状为 (n,) 或 (n,1) 的 numpy 数组中的数字?
In this benchmark, deque is 448ms. Numpy.roll is 29ms http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/
在这个基准测试中,deque 是 448ms。Numpy.roll 是 29 毫秒 http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/
回答by Schmouk
The original question was: "efficient" circular buffer. According to this efficiency asked for, the answer from aaronasterling seems to be definitively correct. Using a dedicated class programmed in Python and comparing time processing with collections.deque shows a x5.2 times acceleration with deque! Here is very simple code to test this:
最初的问题是:“高效”的循环缓冲区。根据要求的这种效率,aaronasterling 的答案似乎绝对正确。使用在 Python 中编程的专用类并将时间处理与 collections.deque 进行比较显示了使用 deque 的 x5.2 倍加速!这是一个非常简单的代码来测试这个:
class cb:
def __init__(self, size):
self.b = [0]*size
self.i = 0
self.sz = size
def append(self, v):
self.b[self.i] = v
self.i = (self.i + 1) % self.sz
b = cb(1000)
for i in range(10000):
b.append(i)
# called 200 times, this lasts 1.097 second on my laptop
from collections import deque
b = deque( [], 1000 )
for i in range(10000):
b.append(i)
# called 200 times, this lasts 0.211 second on my laptop
To transform a deque into a list, just use:
要将双端队列转换为列表,只需使用:
my_list = [v for v in my_deque]
You will then get O(1) random access to the deque items. Of course, this is only valuable if you need to do many random accesses to the deque after having set it once.
然后,您将获得对双端队列项目的 O(1) 随机访问。当然,这只有在设置一次后需要对双端队列进行多次随机访问时才有价值。
回答by Basj
Based on MoonCactus's answer, here is a circularlistclass. The difference with his version is that here c[0]will always give the oldest-appended element, c[-1]the latest-appended element, c[-2]the penultimate... This is more natural for applications.
基于MoonCactus 的回答,这里是一个circularlist类。与他的版本不同的是,这里 c[0]总是会给出最旧的追加元素、c[-1]最新追加的元素、c[-2]倒数第二个……这对于应用程序来说更自然。
c = circularlist(4)
c.append(1); print c, c[0], c[-1] #[1] 1, 1
c.append(2); print c, c[0], c[-1] #[1, 2] 1, 2
c.append(3); print c, c[0], c[-1] #[1, 2, 3] 1, 3
c.append(8); print c, c[0], c[-1] #[1, 2, 3, 8] 1, 8
c.append(10); print c, c[0], c[-1] #[10, 2, 3, 8] 2, 10
c.append(11); print c, c[0], c[-1] #[10, 11, 3, 8] 3, 11
Class:
班级:
class circularlist(object):
def __init__(self, size, data = []):
"""Initialization"""
self.index = 0
self.size = size
self._data = list(data)[-size:]
def append(self, value):
"""Append an element"""
if len(self._data) == self.size:
self._data[self.index] = value
else:
self._data.append(value)
self.index = (self.index + 1) % self.size
def __getitem__(self, key):
"""Get element by index, relative to the current index"""
if len(self._data) == self.size:
return(self._data[(key + self.index) % self.size])
else:
return(self._data[key])
def __repr__(self):
"""Return string representation"""
return self._data.__repr__() + ' (' + str(len(self._data))+' items)'
[Edited]:Added optional dataparameter to allow initialization from existing lists, e.g.:
[已编辑]:添加了可选data参数以允许从现有列表进行初始化,例如:
circularlist(4, [1, 2, 3, 4, 5]) # [2, 3, 4, 5] (4 items)
circularlist(4, set([1, 2, 3, 4, 5])) # [2, 3, 4, 5] (4 items)
circularlist(4, (1, 2, 3, 4, 5)) # [2, 3, 4, 5] (4 items)
回答by sirlark
I've had this problem before doing serial programming. At the time just over a year ago, I couldn't find any efficient implementations either, so I ended up writing one as a C extensionand it's also available on pypiunder an MIT license. It's super basic, only handles buffers of 8-bit signed chars, but is of flexible length, so you can use Struct or something on top of it if you need something other than chars. I see now with a google search that there are several options these days though, so you might want to look at those too.
我在进行串行编程之前遇到过这个问题。一年多前的时候,我也找不到任何有效的实现,所以我最终编写了一个 C 扩展,它也可以在 pypi 上使用 MIT 许可证。它是超级基本的,只处理 8 位有符号字符的缓冲区,但长度灵活,所以如果你需要字符以外的东西,你可以使用 Struct 或它上面的东西。我现在通过谷歌搜索看到,这些天有几个选项,所以你可能也想看看这些。
回答by Johnny Wong
You answer is not right. Circular buffer main have two priciples(https://en.wikipedia.org/wiki/Circular_buffer)
你的回答不对。循环缓冲区主要有两个原则(https://en.wikipedia.org/wiki/Circular_buffer)
- The lenth of the buffer is setted;
- First in first out;
- When you add or delete a item, the other items should not move their position
- 设置缓冲区的长度;
- 先进先出;
- 添加或删除项目时,其他项目不应移动其位置
your code below:
您的代码如下:
def add_to_buffer( self, num ):
self.mylist.pop( 0 )
self.mylist.append( num )
Let's consider a situation that the list is full, by use your code:
让我们通过使用您的代码来考虑列表已满的情况:
self.mylist = [1, 2, 3, 4, 5]
now we append 6, the list is changed to
现在我们追加 6,列表更改为
self.mylist = [2, 3, 4, 5, 6]
the items expect 1 in list has changed their position
列表中期望为 1 的项目已更改其位置
your code is a queue, not a circle buffer.
您的代码是一个队列,而不是一个循环缓冲区。
The answer of Basj, I think is the most efficent one.
Basj的答案,我认为是最有效的。
By the way, a circle buffer can imporve the performance of the operation to add a item.
顺便说一句,循环缓冲区可以提高添加项目的操作的性能。

