如何使用 python pandas 处理传入的实时数据
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/16740887/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to handle incoming real time data with python pandas
提问by Marcelo MD
Which is the most recommended/pythonic way of handling live incoming data with pandas?
使用 Pandas 处理实时传入数据的最推荐/pythonic 方法是哪种?
Every few seconds I'm receiving a data point in the format below:
每隔几秒钟,我就会收到以下格式的数据点:
{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}
I would like to append it to an existing DataFrame and then run some analysis on it.
我想将它附加到现有的 DataFrame 中,然后对其进行一些分析。
The problem is, just appending rows with DataFrame.append can lead to performance issues with all that copying.
问题是,仅使用 DataFrame.append 附加行可能会导致所有复制的性能问题。
Things I've tried:
我尝试过的事情:
A few people suggested preallocating a big DataFrame and updating it as data comes in:
一些人建议预先分配一个大的 DataFrame 并在数据进入时更新它:
In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)
In [2]: columns = ['high', 'low', 'open', 'close']
In [3]: df = pd.DataFrame(index=t, columns=columns)
In [4]: df
Out[4]:
high low open close
2013-01-01 00:00:00 NaN NaN NaN NaN
2013-01-01 00:00:01 NaN NaN NaN NaN
2013-01-01 00:00:02 NaN NaN NaN NaN
2013-01-01 00:00:03 NaN NaN NaN NaN
2013-01-01 00:00:04 NaN NaN NaN NaN
In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}
In [6]: data_ = pd.Series(data)
In [7]: df.loc[data['time']] = data_
In [8]: df
Out[8]:
high low open close
2013-01-01 00:00:00 NaN NaN NaN NaN
2013-01-01 00:00:01 NaN NaN NaN NaN
2013-01-01 00:00:02 4 3 2 1
2013-01-01 00:00:03 NaN NaN NaN NaN
2013-01-01 00:00:04 NaN NaN NaN NaN
The other alternative is building a list of dicts. Simply appending the incoming data to a list and slicing it into smaller DataFrames to do the work.
另一种选择是构建一个字典列表。只需将传入的数据附加到列表中,然后将其切成较小的 DataFrame 即可完成工作。
In [9]: ls = []
In [10]: for n in range(5):
.....: # Naive stuff ahead =)
.....: time = '2013-01-01 00:00:0' + str(n)
.....: d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}
.....: ls.append(d)
In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')
In [12]: df
Out[12]:
close high low open stock
time
2013-01-01 00:00:01 3.270078 1.008289 7.486118 2.180683 BLAH
2013-01-01 00:00:02 3.883586 2.215645 0.051799 2.310823 BLAH
or something like that, maybe processing the input a little bit more.
或类似的东西,也许处理输入多一点。
回答by Brent Washburne
You are actually trying to solve two problems: capturing real-time data and analyzing that data. The first problem can be solved with Python logging, which is designed for this purpose. Then the other problem can be solved by reading that same log file.
您实际上是在尝试解决两个问题:捕获实时数据并分析该数据。第一个问题可以用为此目的而设计的Python logging解决。然后另一个问题可以通过读取相同的日志文件来解决。
回答by Andy Hayden
I would use HDF5/pytables as follows:
我将使用 HDF5/pytables 如下:
- Keep the data as a python list "as long as possible".
- Append your results to that list.
- When it gets "big":
- push to HDF5 Store using pandas io (and an appendable table).
- clear the list.
- Repeat.
- 将数据“尽可能长地”保留为 python 列表。
- 将您的结果附加到该列表中。
- 当它变得“大”时:
- 使用 pandas io(和一个可附加表)推送到 HDF5 存储。
- 清除清单。
- 重复。
In fact, the function I define uses a list for each "key" so that you can store multiple DataFrames to the HDF5 Store in the same process.
实际上,我定义的函数为每个“键”使用了一个列表,以便您可以在同一进程中将多个 DataFrame 存储到 HDF5 Store。
We define a function which you call with each row d:
我们定义了一个函数,您可以在每一行中调用该函数d:
CACHE = {}
STORE = 'store.h5' # Note: another option is to keep the actual file open
def process_row(d, key, max_len=5000, _cache=CACHE):
"""
Append row d to the store 'key'.
When the number of items in the key's cache reaches max_len,
append the list of rows to the HDF5 store and clear the list.
"""
# keep the rows for each key separate.
lst = _cache.setdefault(key, [])
if len(lst) >= max_len:
store_and_clear(lst, key)
lst.append(d)
def store_and_clear(lst, key):
"""
Convert key's cache list to a DataFrame and append that to HDF5.
"""
df = pd.DataFrame(lst)
with pd.HDFStore(STORE) as store:
store.append(key, df)
lst.clear()
Note: we use the with statement to automatically close the store after each write. It maybe faster to keep it open, but if so it's recommended that you flush regularly (closing flushes). Also note it may be more readable to have used a collections dequerather than a list, but the performance of a list will be slightly better here.
注意:我们使用 with 语句在每次写入后自动关闭存储。它可以更快地保持开放,但即便如此我们建议您定期刷新(收盘刷新)。另请注意,使用集合双端队列而不是列表可能更具可读性,但此处列表的性能会稍好一些。
To use this you call as:
要使用它,您可以调用:
process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0},
key="df")
Note: "df" is the stored keyused in the pytables store.
注意:“df”是pytables存储中使用的存储键。
Once the job has finished ensure you store_and_clearthe remaining cache:
作业完成后,请确保store_and_clear剩余的缓存:
for k, lst in CACHE.items(): # you can instead use .iteritems() in python 2
store_and_clear(lst, k)
Now your complete DataFrame is available via:
现在您可以通过以下方式获得完整的 DataFrame:
with pd.HDFStore(STORE) as store:
df = store["df"] # other keys will be store[key]
Some comments:
一些评论:
- 5000 can be adjusted, try with some smaller/larger numbers to suit your needs.
- List append is O(1), DataFrame append is O(
len(df)). - Until you're doing stats or data-munging you don't need pandas, use what's fastest.
- This code works with multiple key's (data points) coming in.
- This is very little code, and we're staying in vanilla python list and then pandas dataframe...
- 5000 可以调整,尝试使用一些更小/更大的数字来满足您的需求。
- List append 是 O(1),DataFrame append 是 O(
len(df))。 - 在您进行统计或数据处理之前,您不需要熊猫,请使用最快的。
- 此代码适用于传入的多个键(数据点)。
- 这是很少的代码,我们停留在vanilla python list和pandas dataframe中......
Additionally, to get the up to date reads you could define a get method which stores and clears beforereading. In this way you would get the most up to date data:
此外,为了获得最新的读取,您可以定义一个在读取之前存储和清除的 get 方法。通过这种方式,您将获得最新的数据:
def get_latest(key, _cache=CACHE):
store_and_clear(_cache[key], key)
with pd.HDFStore(STORE) as store:
return store[key]
Now when you access with:
现在,当您访问:
df = get_latest("df")
you'll get the latest "df" available.
您将获得最新的“df”。
Another option is slightlymore involved: define a custom table in vanilla pytables, see the tutorial.
另一种选择稍微复杂一些:在 vanilla pytables 中定义自定义表,请参阅教程。
Note: You need to know the field-names to create the column descriptor.
注意:您需要知道字段名称才能创建列描述符。

