Python 在 Pandas groupby 之后并行化应用
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/26187759/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Parallelize apply after pandas groupby
提问by Ivan
I have used rosetta.parallel.pandas_easyto parallelize applyafter groupby, for example:
我曾经在 之后rosetta.parallel.pandas_easy并行化,例如:applygroupby
from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)
However, has anyone figured out how to parallelize a function that returns a DataFrame? This code fails for rosetta, as expected.
但是,有没有人想出如何并行化返回 DataFrame 的函数?rosetta正如预期的那样,此代码失败。
def tmpFunc(df):
df['c'] = df.a + df.b
return df
df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)
采纳答案by Ivan
This seems to work, although it really should be built in to pandas
这似乎有效,尽管它确实应该内置到熊猫中
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing
def tmpFunc(df):
df['c'] = df.a + df.b
return df
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
if __name__ == '__main__':
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
print 'parallel version: '
print applyParallel(df.groupby(df.index), tmpFunc)
print 'regular version: '
print df.groupby(df.index).apply(tmpFunc)
print 'ideal version (does not work): '
print df.groupby(df.index).applyParallel(tmpFunc)
回答by JD Long
I have a hack I use for getting parallelization in Pandas. I break my dataframe into chunks, put each chunk into the element of a list, and then use ipython's parallel bits to do a parallel apply on the list of dataframes. Then I put the list back together using pandas concatfunction.
我有一个 hack 用于在 Pandas 中进行并行化。我将数据帧分成多个块,将每个块放入列表的元素中,然后使用 ipython 的并行位对数据帧列表进行并行应用。然后我使用 pandasconcat函数将列表重新组合在一起。
This is not generally applicable, however. It works for me because the function I want to apply to each chunk of the dataframe takes about a minute. And the pulling apart and putting together of my data does not take all that long. So this is clearly a kludge. With that said, here's an example. I'm using Ipython notebook so you'll see %%timemagic in my code:
然而,这并不普遍适用。它对我有用,因为我想应用于数据帧的每个块的函数大约需要一分钟。将我的数据拆开并放在一起并不会花费那么长时间。所以这显然是一团糟。话虽如此,这是一个例子。我正在使用 Ipython notebook,所以你会%%time在我的代码中看到魔法:
## make some example data
import pandas as pd
np.random.seed(1)
n=10000
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n),
'data' : np.random.rand(n)})
grouped = df.groupby('mygroup')
For this example I'm going to make 'chunks' based on the above groupby, but this does not have to be how the data is chunked. Although it's a pretty common pattern.
对于此示例,我将根据上述 groupby 制作“块”,但这不一定是数据的分块方式。虽然这是一个很常见的模式。
dflist = []
for name, group in grouped:
dflist.append(group)
set up the parallel bits
设置并行位
from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True
write a silly function to apply to our data
写一个愚蠢的函数来应用到我们的数据
def myFunc(inDf):
inDf['newCol'] = inDf.data ** 10
return inDf
now let's run the code in serial then in parallel. serial first:
现在让我们先串行然后并行运行代码。串行第一:
%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s
now parallel
现在平行
%%time
parallel_list = lview.map(myFunc, dflist)
CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s
then it only takes a few ms to merge them back into one dataframe
然后只需几毫秒即可将它们合并回一个数据帧
%%time
combinedDf = pd.concat(parallel_list)
CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms
I'm running 6 IPython engines on my MacBook, but you can see it drops the execute time down to 2s from 14s.
我在我的 MacBook 上运行了 6 个 IPython 引擎,但是你可以看到它把执行时间从 14 秒减少到了 2 秒。
For really long running stochastic simulations I can use AWS backend by firing up a cluster with StarCluster. Much of the time, however, I parallelize just across 8 CPUs on my MBP.
对于真正长时间运行的随机模拟,我可以通过使用StarCluster启动集群来使用 AWS 后端。然而,大多数情况下,我只在 MBP 上的 8 个 CPU 上进行并行处理。
回答by Pietro Battiston
Ivan's answer is great, but it looks like it can be slightly simplified, also removing the need to depend on joblib:
Ivan 的回答很好,但看起来可以稍微简化一下,也不需要依赖 joblib:
from multiprocessing import Pool, cpu_count
def applyParallel(dfGrouped, func):
with Pool(cpu_count()) as p:
ret_list = p.map(func, [group for name, group in dfGrouped])
return pandas.concat(ret_list)
By the way: this can not replace anygroupby.apply(), but it will cover the typical cases: e.g. it should cover cases 2 and 3 in the documentation, while you should obtain the behaviour of case 1 by giving the argument axis=1to the final pandas.concat()call.
顺便说一句:这不能代替任何groupby.apply(),但它会涵盖典型情况:例如,它应该涵盖文档中的情况 2 和 3 ,而您应该通过将参数axis=1提供给最后的pandas.concat()电话。
EDIT:the docs changed; the old version can be found here, in any case I'm copypasting the three examples below.
编辑:文档已更改;旧版本可以在这里找到,无论如何我复制粘贴下面的三个例子。
case 1: group DataFrame apply aggregation function (f(chunk) -> Series) yield DataFrame, with group axis having group labels
case 2: group DataFrame apply transform function ((f(chunk) -> DataFrame with same indexes) yield DataFrame with resulting chunks glued together
case 3: group Series apply function with f(chunk) -> DataFrame yield DataFrame with result of chunks glued together
回答by spring
A short comment to accompany JD Long's answer. I've found that if the number of groups is very large (say hundreds of thousands), and your apply function is doing something fairly simple and quick, then breaking up your dataframe into chunks and assigning each chunk to a worker to carry out a groupby-apply (in serial) can be much faster than doing a parallel groupby-apply and having the workers read off a queue containing a multitude of groups. Example:
JD Long 回答的简短评论。我发现如果组的数量非常大(比如数十万),并且您的 apply 函数正在做一些相当简单和快速的事情,那么将您的数据帧分解成块并将每个块分配给一个工作人员来执行groupby-apply(串行)可以比并行 groupby-apply 和让工作人员读取包含多个组的队列快得多。例子:
import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})
So our dataframe looks like:
所以我们的数据框看起来像:
a
0 3425
1 1016
2 8141
3 9263
4 8018
Note that column 'a' has many groups (think customer ids):
请注意,“a”列有许多组(想想客户 ID):
len(df.a.unique())
15000
A function to operate on our groups:
对我们的组进行操作的函数:
def f1(group):
time.sleep(0.0001)
return group
Start a pool:
启动池:
ppe = ProcessPoolExecutor(12)
futures = []
results = []
Do a parallel groupby-apply:
做一个并行的 groupby-apply:
%%time
for name, group in df.groupby('a'):
p = ppe.submit(f1, group)
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
del ppe
CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s
Let's now add a column which partitions the df into many fewer groups:
现在让我们添加一列,将 df 划分为更少的组:
df['b'] = np.random.randint(0, 12, nrows)
Now instead of 15000 groups there are only 12:
现在只有 12 个组而不是 15000 个组:
len(df.b.unique())
12
We'll partition our df and do a groupby-apply on each chunk.
我们将对我们的 df 进行分区并对每个块进行 groupby-apply。
ppe = ProcessPoolExecutor(12)
Wrapper fun:
包装乐趣:
def f2(df):
df.groupby('a').apply(f1)
return df
Send out each chunk to be operated on in serial:
串行发送每个要操作的chunk:
%%time
for i in df.b.unique():
p = ppe.submit(f2, df[df.b==i])
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s
Note that the amount of time spend per group has not changed. Rather what has changed is the length of the queue from which the workers read off of. I suspect that what is happening is that the workers cannot access the shared memory simultaneously, and are returning constantly to read off the queue, and are thus stepping on each others toes. With larger chunks to operate on, the workers return less frequently and so this problem is ameliorated and the overall execution is faster.
请注意,每组花费的时间没有改变。相反,改变的是工作人员从中读取的队列长度。我怀疑正在发生的事情是工作人员无法同时访问共享内存,并且不断返回以读取队列,因此相互踩踏。使用更大的块来操作,worker 返回的频率更低,因此这个问题得到了改善,整体执行速度更快。
回答by Jinhua Wang
Personally I would recommend using dask, per this thread.
我个人建议根据此线程使用 dask 。
As @chrisb pointed out, multiprocessing with pandas in python might create unnecessary overhead. It might also notperform as well as multithreading or even as a single thread.
正如@chrisb 指出的那样,在 python 中使用 Pandas 进行多处理可能会产生不必要的开销。它也可能不执行,以及多线程,甚至作为一个单独的线程。
Dask is created specifically for multiproccessing.
Dask 是专门为多进程创建的。

