Python:在熊猫数据帧上使用多处理

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36794433/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 18:21:34  来源:igfitidea点击:

Python: using multiprocessing on a pandas dataframe

pythonpandasmultiprocessing

提问by dustin

I want to use multiprocessingon a large dataset to find the distance between two gps points. I constructed a test set, but I have been unable to get multiprocessingto work on this set.

我想multiprocessing在大型数据集上使用来查找两个 gps 点之间的距离。我构建了一个测试集,但我一直无法multiprocessing在这个集上工作。

import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp

df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
                'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
                'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})



def calc_dist(x):
    return pd.DataFrame(
               [ [grp,
                  df.loc[c[0]].ser_no,
                  df.loc[c[1]].ser_no,
                  vincenty(df.loc[c[0], x], 
                           df.loc[c[1], x])
                 ]
                 for grp,lst in df.groupby('co_nm').groups.items()
                 for c in combinations(lst, 2)
               ],
               columns=['co_nm','machineA','machineB','distance'])

if __name__ == '__main__':
    pool = mp.Pool(processes = (mp.cpu_count() - 1))
    pool.map(calc_dist, ['lat','lon'])
    pool.close()
    pool.join()

I am using Python 2.7.11 and Ipython 4.1.2 with Anaconda 2.5.0 64-bit on Windows7 Professional when this error occurs.

发生此错误时,我在 Windows7 Professional 上将 Python 2.7.11 和 Ipython 4.1.2 与 Anaconda 2.5.0 64 位一起使用。

runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop') Traceback (most recent call last):

File "", line 1, in runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop')

File "C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py", line 699, in runfile execfile(filename, namespace)

File "C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py", line 74, in execfile exec(compile(scripttext, filename, 'exec'), glob, loc)

File "C:/..../multiprocessing test.py", line 33, in pool.map(calc_dist, ['lat','lon'])

File "C:...\AppData\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py", line 251, in map return self.map_async(func, iterable, chunksize).get()

File "C:...\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py", line 567, in get raise self._value

TypeError: Failed to create Point instance from 1.

runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop') 回溯(最后一次调用):

File "", line 1, in runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop')

文件“C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py”,第 699 行,在运行文件 execfile(filename, namespace) 中

文件“C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py”,第 74 行,在 execfile exec(compile(scripttext, filename, 'exec'), glob , 位置)

文件“C:/..../multiprocessing test.py”,第 33 行,在 pool.map(calc_dist, ['lat','lon'])

文件“C:...\AppData\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py”,第 251 行,在映射中返回 self.map_async(func, iterable, chunksize).get()

文件“C:...\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py”,第 567 行,在 get raise self._value

类型错误:无法从 1 创建 Point 实例。

def get(self, timeout=None):
    self.wait(timeout)
    if not self._ready:
        raise TimeoutError
    if self._success:
        return self._value
    else:
        raise self._value

采纳答案by ptrj

What's wrong

怎么了

This line from your code:

您的代码中的这一行:

pool.map(calc_dist, ['lat','lon'])

spawns 2 processes - one runs calc_dist('lat')and the other runs calc_dist('lon'). Compare the first example in doc. (Basically, pool.map(f, [1,2,3])calls fthree times with arguments given in the list that follows: f(1), f(2), and f(3).) If I'm not mistaken, your function calc_distcan only be called calc_dist('lat', 'lon'). And it doesn't allow for parallel processing.

产生 2 个进程 - 一个运行calc_dist('lat'),另一个运行calc_dist('lon')。比较doc 中的第一个示例。(基本上,pool.map(f, [1,2,3])调用f在下面的列表中给出的参数三次f(1)f(2)f(3))。如果我没有记错的话,你的函数calc_dist只能叫calc_dist('lat', 'lon')。它不允许并行处理。

Solution

解决方案

I believe you want to split the work between processes, probably sending each tuple (grp, lst)to a separate process. The following code does exactly that.

我相信您想在进程之间拆分工作,可能将每个元组发送(grp, lst)到一个单独的进程。下面的代码正是这样做的。

First, let's prepare for splitting:

首先,让我们准备拆分:

grp_lst_args = list(df.groupby('co_nm').groups.items())

print(grp_lst_args)
[('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]

We'll send each of these tuples (here, there are three of them) as an argument to a function in a separate process. We need to rewrite the function, let's call it calc_dist2. For convenience, it's argument is a tuple as in calc_dist2(('aa',[0,1,2]))

我们将把这些元组中的每一个(这里有三个)作为参数发送给一个单独进程中的函数。我们需要重写这个函数,我们称之为calc_dist2。为方便起见,它的参数是一个元组,如calc_dist2(('aa',[0,1,2]))

def calc_dist2(arg):
    grp, lst = arg
    return pd.DataFrame(
               [ [grp,
                  df.loc[c[0]].ser_no,
                  df.loc[c[1]].ser_no,
                  vincenty(df.loc[c[0], ['lat','lon']], 
                           df.loc[c[1], ['lat','lon']])
                 ]
                 for c in combinations(lst, 2)
               ],
               columns=['co_nm','machineA','machineB','distance'])

And now comes the multiprocessing:

现在是多处理:

pool = mp.Pool(processes = (mp.cpu_count() - 1))
results = pool.map(calc_dist2, grp_lst_args)
pool.close()
pool.join()

results_df = pd.concat(results)

resultsis a list of results (here data frames) of calls calc_dist2((grp,lst))for (grp,lst)in grp_lst_args. Elements of resultsare later concatenated to one data frame.

results是结果的通话清单(此处数据帧)calc_dist2((grp,lst))(grp,lst)grp_lst_args。的元素results稍后连接到一个数据帧。

print(results_df)
  co_nm  machineA  machineB          distance
0    aa         1         2  156.876149391 km
1    aa         1         3  313.705445447 km
2    aa         2         3  156.829329105 km
0    cc         8         9  156.060165391 km
1    cc         8         0  311.910998169 km
2    cc         9         0  155.851498134 km
0    bb         4         5  156.665641837 km
1    bb         4         6  313.214333025 km
2    bb         4         7  469.622535339 km
3    bb         5         6  156.548897414 km
4    bb         5         7  312.957597466 km
5    bb         6         7   156.40899677 km

BTW, In Python 3 we could use a withconstruction:

顺便说一句,在 Python 3 中,我们可以使用一个with结构:

with mp.Pool() as pool:
    results = pool.map(calc_dist2, grp_lst_args)

Update

更新

I tested this code only on linux. On linux, the read only data frame dfcan be accessed by child processes and is not copied to their memory space, but I'm not sure how it exactly works on Windows. You may consider splitting dfinto chunks (grouped by co_nm) and sending these chunks as arguments to some other version of calc_dist.

我仅在 linux 上测试了此代码。在 linux 上,df子进程可以访问只读数据框,并且不会复制到它们的内存空间,但我不确定它在 Windows 上究竟是如何工作的。您可以考虑拆分df成块(按 分组co_nm)并将这些块作为参数发送到calc_dist.

回答by salomonderossi

Strange. It seems to work under python2 but not python3.

奇怪的。它似乎在 python2 下工作,但在 python3 下不工作。

This is a minimal modified version to print the output:

这是打印输出的最小修改版本:

import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp

df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
                'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
                'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})



def calc_dist(x):
    ret =  pd.DataFrame(
               [ [grp,
                  df.loc[c[0]].ser_no,
                  df.loc[c[1]].ser_no,
                  vincenty(df.loc[c[0], x],
                           df.loc[c[1], x])
                 ]
                 for grp,lst in df.groupby('co_nm').groups.items()
                 for c in combinations(lst, 2)
               ],
               columns=['co_nm','machineA','machineB','distance'])
    print(ret)
    return ret

if __name__ == '__main__':
    pool = mp.Pool(processes = (mp.cpu_count() - 1))
    pool.map(calc_dist, ['lat','lon'])
    pool.close()
    pool.join()

And this is the output from python2

这是python2的输出

0     aa         1         2  110.723608682 km
1     aa         1         3  221.460709525 km
2     aa         2         3  110.737100843 km
3     cc         8         9  110.827576495 km
4     cc         8         0  221.671650552 km
   co_nm  machineA  machineB          distance
5     cc         9         0  110.844074057 km
0     aa         1         2  110.575064814 km
1     aa         1         3  221.151481337 km
6     bb         4         5  110.765515243 km
2     aa         2         3  110.576416524 km
7     bb         4         6    221.5459187 km
3     cc         8         9  110.598565514 km
4     cc         8         0  221.203121352 km
8     bb         4         7  332.341640771 km
5     cc         9         0  110.604555838 km
6     bb         4         5   110.58113908 km
9     bb         5         6  110.780403457 km
7     bb         4         6  221.165643396 km
10    bb         5         7  221.576125528 km
8     bb         4         7  331.754177186 km
9     bb         5         6  110.584504316 km
10    bb         5         7  221.173038106 km
11    bb         6         7  110.795722071 km
11    bb         6         7   110.58853379 km

And this the stack trace from python3

这是来自python3的堆栈跟踪

"""
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 123, in __new__
    seq = iter(arg)
TypeError: 'numpy.int64' object is not iterable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "gps.py", line 29, in calc_dist
    for grp, lst in df.groupby('co_nm').groups.items()
  File "gps.py", line 30, in <listcomp>
    for c in combinations(lst, 2)
  File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 322, in __init__
    super(vincenty, self).__init__(*args, **kwargs)
  File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 115, in __init__
    kilometers += self.measure(a, b)
  File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 342, in measure
    a, b = Point(a), Point(b)
  File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 126, in __new__
    "Failed to create Point instance from %r." % (arg,)
TypeError: Failed to create Point instance from 8.
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "gps.py", line 38, in <module>
    pool.map(calc_dist, ['lat', 'lon'])
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
    raise self._value
TypeError: Failed to create Point instance from 8.

I know this is not the answer, but maybe it helps...

我知道这不是答案,但也许它有帮助......