Python 熊猫多处理应用

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/26784164/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 01:00:43  来源:igfitidea点击:

pandas multiprocessing apply

pythonpandasmultiprocessing

提问by yemu

I'm trying to use multiprocessing with pandas dataframe, that is split the dataframe to 8 parts. apply some function to each part using apply (with each part processed in different process).

我正在尝试对 Pandas 数据帧使用多处理,即将数据帧拆分为 8 个部分。使用 apply 将一些功能应用于每个部分(每个部分在不同的过程中处理)。

EDIT: Here's the solution I finally found:

编辑:这是我最终找到的解决方案:

import multiprocessing as mp
import pandas.util.testing as pdt

def process_apply(x):
    # do some stuff to data here

def process(df):
    res = df.apply(process_apply, axis=1)
    return res

if __name__ == '__main__':
    p = mp.Pool(processes=8)
    split_dfs = np.array_split(big_df,8)
    pool_results = p.map(aoi_proc, split_dfs)
    p.close()
    p.join()

    # merging parts processed by different processes
    parts = pd.concat(pool_results, axis=0)

    # merging newly calculated parts to big_df
    big_df = pd.concat([big_df, parts], axis=1)

    # checking if the dfs were merged correctly
    pdt.assert_series_equal(parts['id'], big_df['id'])

回答by Rafael Barros

Since I don't have much of your data script, this is a guess, but I'd suggest using p.mapinstead of apply_asyncwith the callback.

由于我没有你的太多数据脚本,这是一个猜测,但我建议使用p.map而不是apply_async回调。

p = mp.Pool(8)
pool_results = p.map(process, np.array_split(big_df,8))
p.close()
p.join()
results = []
for result in pool_results:
    results.extend(result)

回答by user6651227

I also run into the same problem when I use multiprocessing.map()to apply function to different chunk of a large dataframe.

当我multiprocessing.map()将函数应用于大型数据帧的不同块时,我也遇到了同样的问题。

I just want to add several points just in case other people run into the same problem as I do.

我只是想补充几点,以防其他人遇到和我一样的问题。

  1. remember to add if __name__ == '__main__':
  2. execute the file in a .pyfile, if you use ipython/jupyter notebook, then you can not run multiprocessing(this is true for my case, though I have no clue)
  1. 记得添加 if __name__ == '__main__':
  2. 在文件中执行.py文件,如果使用ipython/jupyter notebook,则无法运行multiprocessing(这对我来说是正确的,尽管我不知道)

回答by Tom Raz

A more generic version based on the author solution, that allows to run it on every function and dataframe:

基于作者解决方案的更通用版本,允许在每个函数和数据帧上运行它:

from multiprocessing import  Pool
from functools import partial
import numpy as np

def parallelize(data, func, num_of_processes=8):
    data_split = np.array_split(data, num_of_processes)
    pool = Pool(num_of_processes)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

def run_on_subset(func, data_subset):
    return data_subset.apply(func, axis=1)

def parallelize_on_rows(data, func, num_of_processes=8):
    return parallelize(data, partial(run_on_subset, func), num_of_processes)

So the following line:

所以下面这行:

df.apply(some_func, axis=1)

Will become:

会变成:

parallelize_on_rows(df, some_func) 

回答by Sébastien Vincent

You can use https://github.com/nalepae/pandarallel, as in the following example:

您可以使用https://github.com/nalepae/pandarallel,如下例所示:

from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

回答by EliadL

This worked well for me:

这对我来说效果很好:

rows_iter = (row for _, row in df.iterrows())

with multiprocessing.Pool() as pool:
    df['new_column'] = pool.map(process_apply, rows_iter)

回答by asu

Install Pyxtensionthat simplifies using parallel map and use like this:

安装简化使用并行映射的Pyxtension并像这样使用:

from pyxtension.streams import stream

big_df = pd.concat(stream(np.array_split(df, multiprocessing.cpu_count())).mpmap(process))