有效地将函数应用于并行分组的 Pandas DataFrame
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/11728836/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Efficiently applying a function to a grouped pandas DataFrame in parallel
提问by user2303
I often need to apply a function to the groups of a very large DataFrame(of mixed data types) and would like to take advantage of multiple cores.
我经常需要将一个函数应用于一个非常大的DataFrame(混合数据类型)组,并希望利用多个内核。
I can create an iterator from the groups and use the multiprocessing module, but it is not efficient because every group and the results of the function must be pickled for messaging between processes.
我可以从组创建迭代器并使用多处理模块,但它效率不高,因为每个组和函数的结果都必须被pickle,以便在进程之间进行消息传递。
Is there any way to avoid the pickling or even avoid the copying of the DataFramecompletely? It looks like the shared memory functions of the multiprocessing modules are limited to numpyarrays. Are there any other options?
有没有办法避免酸洗甚至DataFrame完全避免复制?看起来多处理模块的共享内存功能仅限于numpy数组。还有其他选择吗?
回答by Ami Tavory
From the comments above, it seems that this is planned for pandassome time (there's also an interesting-looking rosettaprojectwhich I just noticed).
从上面的评论来看,这似乎是计划了pandas一段时间(我刚刚注意到还有一个看起来很有趣的rosetta项目)。
However, until every parallel functionality is incorporated into pandas, I noticed that it's very easy to write efficient & non-memory-copying parallel augmentations to pandasdirectly using cython+ OpenMPand C++.
然而,直到每一个并行的功能被结合到pandas,我注意到,它很容易编写高效&非存储器复制平行扩增以pandas直接使用cython+ OpenMP的和C ++。
Here's a short example of writing a parallel groupby-sum, whose use is something like this:
这是一个编写并行 groupby-sum 的简短示例,其用途如下:
import pandas as pd
import para_group_demo
df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)
and output is:
和输出是:
sum
key
0 6
1 11
2 4
NoteDoubtlessly, this simple example's functionality will eventually be part of pandas. Some things, however, will be more natural to parallelize in C++ for some time, and it's important to be aware of how easy it is to combine this into pandas.
注意毫无疑问,这个简单示例的功能最终将成为pandas. 然而,有些事情在一段时间内在 C++ 中并行化会更自然,重要的是要意识到将它合并到pandas.
To do this, I wrote a simple single-source-file extension whose code follows.
为此,我编写了一个简单的单源文件扩展,其代码如下。
It starts with some imports and type definitions
它从一些导入和类型定义开始
from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange
import pandas as pd
ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t
The C++ unordered_maptype is for summing by a single thread, and the vectoris for summing by all threads.
C++unordered_map类型是单线程vector求和,全线程求和。
Now to the function sum. It starts off with typed memory viewsfor fast access:
现在到函数sum。它从用于快速访问的类型化内存视图开始:
def sum(crit, vals):
cdef int64_t[:] crit_view = crit.values
cdef int64_t[:] vals_view = vals.values
The function continues by dividing the semi-equally to the threads (here hardcoded to 4), and having each thread sum the entries in its range:
该函数继续将半等分到线程(此处硬编码为 4),并让每个线程对其范围内的条目求和:
cdef uint64_t num_threads = 4
cdef uint64_t l = len(crit)
cdef uint64_t s = l / num_threads + 1
cdef uint64_t i, j, e
cdef counts_vec_t counts
counts = counts_vec_t(num_threads)
counts.resize(num_threads)
with cython.boundscheck(False):
for i in prange(num_threads, nogil=True):
j = i * s
e = j + s
if e > l:
e = l
while j < e:
counts[i][crit_view[j]] += vals_view[j]
inc(j)
When the threads have completed, the function merges all the results (from the different ranges) into a single unordered_map:
当线程完成后,该函数将所有结果(来自不同范围)合并为一个unordered_map:
cdef counts_t total
cdef counts_it_t it, e_it
for i in range(num_threads):
it = counts[i].begin()
e_it = counts[i].end()
while it != e_it:
total[deref(it).first] += deref(it).second
inc(it)
All that's left is to create a DataFrameand return the results:
剩下的就是创建一个DataFrame并返回结果:
key, sum_ = [], []
it = total.begin()
e_it = total.end()
while it != e_it:
key.append(deref(it).first)
sum_.append(deref(it).second)
inc(it)
df = pd.DataFrame({'key': key, 'sum': sum_})
df.set_index('key', inplace=True)
return df

