Python 熊猫多处理应用
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/26784164/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
pandas multiprocessing apply
提问by yemu
I'm trying to use multiprocessing with pandas dataframe, that is split the dataframe to 8 parts. apply some function to each part using apply (with each part processed in different process).
我正在尝试对 Pandas 数据帧使用多处理,即将数据帧拆分为 8 个部分。使用 apply 将一些功能应用于每个部分(每个部分在不同的过程中处理)。
EDIT: Here's the solution I finally found:
编辑:这是我最终找到的解决方案:
import multiprocessing as mp
import pandas.util.testing as pdt
def process_apply(x):
# do some stuff to data here
def process(df):
res = df.apply(process_apply, axis=1)
return res
if __name__ == '__main__':
p = mp.Pool(processes=8)
split_dfs = np.array_split(big_df,8)
pool_results = p.map(aoi_proc, split_dfs)
p.close()
p.join()
# merging parts processed by different processes
parts = pd.concat(pool_results, axis=0)
# merging newly calculated parts to big_df
big_df = pd.concat([big_df, parts], axis=1)
# checking if the dfs were merged correctly
pdt.assert_series_equal(parts['id'], big_df['id'])
回答by Rafael Barros
Since I don't have much of your data script, this is a guess, but I'd suggest using p.mapinstead of apply_asyncwith the callback.
由于我没有你的太多数据脚本,这是一个猜测,但我建议使用p.map而不是apply_async回调。
p = mp.Pool(8)
pool_results = p.map(process, np.array_split(big_df,8))
p.close()
p.join()
results = []
for result in pool_results:
results.extend(result)
回答by user6651227
I also run into the same problem when I use multiprocessing.map()to apply function to different chunk of a large dataframe.
当我multiprocessing.map()将函数应用于大型数据帧的不同块时,我也遇到了同样的问题。
I just want to add several points just in case other people run into the same problem as I do.
我只是想补充几点,以防其他人遇到和我一样的问题。
- remember to add
if __name__ == '__main__': - execute the file in a
.pyfile, if you useipython/jupyter notebook, then you can not runmultiprocessing(this is true for my case, though I have no clue)
- 记得添加
if __name__ == '__main__': - 在文件中执行
.py文件,如果使用ipython/jupyter notebook,则无法运行multiprocessing(这对我来说是正确的,尽管我不知道)
回答by Tom Raz
A more generic version based on the author solution, that allows to run it on every function and dataframe:
基于作者解决方案的更通用版本,允许在每个函数和数据帧上运行它:
from multiprocessing import Pool
from functools import partial
import numpy as np
def parallelize(data, func, num_of_processes=8):
data_split = np.array_split(data, num_of_processes)
pool = Pool(num_of_processes)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
def run_on_subset(func, data_subset):
return data_subset.apply(func, axis=1)
def parallelize_on_rows(data, func, num_of_processes=8):
return parallelize(data, partial(run_on_subset, func), num_of_processes)
So the following line:
所以下面这行:
df.apply(some_func, axis=1)
Will become:
会变成:
parallelize_on_rows(df, some_func)
回答by Sébastien Vincent
You can use https://github.com/nalepae/pandarallel, as in the following example:
您可以使用https://github.com/nalepae/pandarallel,如下例所示:
from pandarallel import pandarallel
from math import sin
pandarallel.initialize()
def func(x):
return sin(x**2)
df.parallel_apply(func, axis=1)
回答by EliadL
This worked well for me:
这对我来说效果很好:
rows_iter = (row for _, row in df.iterrows())
with multiprocessing.Pool() as pool:
df['new_column'] = pool.map(process_apply, rows_iter)
回答by asu
Install Pyxtensionthat simplifies using parallel map and use like this:
安装简化使用并行映射的Pyxtension并像这样使用:
from pyxtension.streams import stream
big_df = pd.concat(stream(np.array_split(df, multiprocessing.cpu_count())).mpmap(process))

