Python:在熊猫数据帧上使用多处理
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/36794433/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Python: using multiprocessing on a pandas dataframe
提问by dustin
I want to use multiprocessing
on a large dataset to find the distance between two gps points. I constructed a test set, but I have been unable to get multiprocessing
to work on this set.
我想multiprocessing
在大型数据集上使用来查找两个 gps 点之间的距离。我构建了一个测试集,但我一直无法multiprocessing
在这个集上工作。
import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
def calc_dist(x):
return pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], x],
df.loc[c[1], x])
]
for grp,lst in df.groupby('co_nm').groups.items()
for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
if __name__ == '__main__':
pool = mp.Pool(processes = (mp.cpu_count() - 1))
pool.map(calc_dist, ['lat','lon'])
pool.close()
pool.join()
I am using Python 2.7.11 and Ipython 4.1.2 with Anaconda 2.5.0 64-bit on Windows7 Professional when this error occurs.
发生此错误时,我在 Windows7 Professional 上将 Python 2.7.11 和 Ipython 4.1.2 与 Anaconda 2.5.0 64 位一起使用。
runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop') Traceback (most recent call last):
File "", line 1, in runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop')
File "C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py", line 699, in runfile execfile(filename, namespace)
File "C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py", line 74, in execfile exec(compile(scripttext, filename, 'exec'), glob, loc)
File "C:/..../multiprocessing test.py", line 33, in pool.map(calc_dist, ['lat','lon'])
File "C:...\AppData\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py", line 251, in map return self.map_async(func, iterable, chunksize).get()
File "C:...\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py", line 567, in get raise self._value
TypeError: Failed to create Point instance from 1.
runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop') 回溯(最后一次调用):
File "", line 1, in runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop')
文件“C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py”,第 699 行,在运行文件 execfile(filename, namespace) 中
文件“C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py”,第 74 行,在 execfile exec(compile(scripttext, filename, 'exec'), glob , 位置)
文件“C:/..../multiprocessing test.py”,第 33 行,在 pool.map(calc_dist, ['lat','lon'])
文件“C:...\AppData\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py”,第 251 行,在映射中返回 self.map_async(func, iterable, chunksize).get()
文件“C:...\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py”,第 567 行,在 get raise self._value
类型错误:无法从 1 创建 Point 实例。
def get(self, timeout=None):
self.wait(timeout)
if not self._ready:
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
采纳答案by ptrj
What's wrong
怎么了
This line from your code:
您的代码中的这一行:
pool.map(calc_dist, ['lat','lon'])
spawns 2 processes - one runs calc_dist('lat')
and the other runs calc_dist('lon')
. Compare the first example in doc. (Basically, pool.map(f, [1,2,3])
calls f
three times with arguments given in the list that follows: f(1)
, f(2)
, and f(3)
.) If I'm not mistaken, your function calc_dist
can only be called calc_dist('lat', 'lon')
. And it doesn't allow for parallel processing.
产生 2 个进程 - 一个运行calc_dist('lat')
,另一个运行calc_dist('lon')
。比较doc 中的第一个示例。(基本上,pool.map(f, [1,2,3])
调用f
在下面的列表中给出的参数三次f(1)
,f(2)
和f(3)
)。如果我没有记错的话,你的函数calc_dist
只能叫calc_dist('lat', 'lon')
。它不允许并行处理。
Solution
解决方案
I believe you want to split the work between processes, probably sending each tuple (grp, lst)
to a separate process. The following code does exactly that.
我相信您想在进程之间拆分工作,可能将每个元组发送(grp, lst)
到一个单独的进程。下面的代码正是这样做的。
First, let's prepare for splitting:
首先,让我们准备拆分:
grp_lst_args = list(df.groupby('co_nm').groups.items())
print(grp_lst_args)
[('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
We'll send each of these tuples (here, there are three of them) as an argument to a function in a separate process. We need to rewrite the function, let's call it calc_dist2
. For convenience, it's argument is a tuple as in calc_dist2(('aa',[0,1,2]))
我们将把这些元组中的每一个(这里有三个)作为参数发送给一个单独进程中的函数。我们需要重写这个函数,我们称之为calc_dist2
。为方便起见,它的参数是一个元组,如calc_dist2(('aa',[0,1,2]))
def calc_dist2(arg):
grp, lst = arg
return pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], ['lat','lon']],
df.loc[c[1], ['lat','lon']])
]
for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
And now comes the multiprocessing:
现在是多处理:
pool = mp.Pool(processes = (mp.cpu_count() - 1))
results = pool.map(calc_dist2, grp_lst_args)
pool.close()
pool.join()
results_df = pd.concat(results)
results
is a list of results (here data frames) of calls calc_dist2((grp,lst))
for (grp,lst)
in grp_lst_args
. Elements of results
are later concatenated to one data frame.
results
是结果的通话清单(此处数据帧)calc_dist2((grp,lst))
为(grp,lst)
在grp_lst_args
。的元素results
稍后连接到一个数据帧。
print(results_df)
co_nm machineA machineB distance
0 aa 1 2 156.876149391 km
1 aa 1 3 313.705445447 km
2 aa 2 3 156.829329105 km
0 cc 8 9 156.060165391 km
1 cc 8 0 311.910998169 km
2 cc 9 0 155.851498134 km
0 bb 4 5 156.665641837 km
1 bb 4 6 313.214333025 km
2 bb 4 7 469.622535339 km
3 bb 5 6 156.548897414 km
4 bb 5 7 312.957597466 km
5 bb 6 7 156.40899677 km
BTW, In Python 3 we could use a with
construction:
顺便说一句,在 Python 3 中,我们可以使用一个with
结构:
with mp.Pool() as pool:
results = pool.map(calc_dist2, grp_lst_args)
Update
更新
I tested this code only on linux. On linux, the read only data frame df
can be accessed by child processes and is not copied to their memory space, but I'm not sure how it exactly works on Windows. You may consider splitting df
into chunks (grouped by co_nm
) and sending these chunks as arguments to some other version of calc_dist
.
我仅在 linux 上测试了此代码。在 linux 上,df
子进程可以访问只读数据框,并且不会复制到它们的内存空间,但我不确定它在 Windows 上究竟是如何工作的。您可以考虑拆分df
成块(按 分组co_nm
)并将这些块作为参数发送到calc_dist
.
回答by salomonderossi
Strange. It seems to work under python2 but not python3.
奇怪的。它似乎在 python2 下工作,但在 python3 下不工作。
This is a minimal modified version to print the output:
这是打印输出的最小修改版本:
import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
def calc_dist(x):
ret = pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], x],
df.loc[c[1], x])
]
for grp,lst in df.groupby('co_nm').groups.items()
for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
print(ret)
return ret
if __name__ == '__main__':
pool = mp.Pool(processes = (mp.cpu_count() - 1))
pool.map(calc_dist, ['lat','lon'])
pool.close()
pool.join()
And this is the output from python2
这是python2的输出
0 aa 1 2 110.723608682 km
1 aa 1 3 221.460709525 km
2 aa 2 3 110.737100843 km
3 cc 8 9 110.827576495 km
4 cc 8 0 221.671650552 km
co_nm machineA machineB distance
5 cc 9 0 110.844074057 km
0 aa 1 2 110.575064814 km
1 aa 1 3 221.151481337 km
6 bb 4 5 110.765515243 km
2 aa 2 3 110.576416524 km
7 bb 4 6 221.5459187 km
3 cc 8 9 110.598565514 km
4 cc 8 0 221.203121352 km
8 bb 4 7 332.341640771 km
5 cc 9 0 110.604555838 km
6 bb 4 5 110.58113908 km
9 bb 5 6 110.780403457 km
7 bb 4 6 221.165643396 km
10 bb 5 7 221.576125528 km
8 bb 4 7 331.754177186 km
9 bb 5 6 110.584504316 km
10 bb 5 7 221.173038106 km
11 bb 6 7 110.795722071 km
11 bb 6 7 110.58853379 km
And this the stack trace from python3
这是来自python3的堆栈跟踪
"""
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 123, in __new__
seq = iter(arg)
TypeError: 'numpy.int64' object is not iterable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.4/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "gps.py", line 29, in calc_dist
for grp, lst in df.groupby('co_nm').groups.items()
File "gps.py", line 30, in <listcomp>
for c in combinations(lst, 2)
File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 322, in __init__
super(vincenty, self).__init__(*args, **kwargs)
File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 115, in __init__
kilometers += self.measure(a, b)
File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 342, in measure
a, b = Point(a), Point(b)
File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 126, in __new__
"Failed to create Point instance from %r." % (arg,)
TypeError: Failed to create Point instance from 8.
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "gps.py", line 38, in <module>
pool.map(calc_dist, ['lat', 'lon'])
File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
raise self._value
TypeError: Failed to create Point instance from 8.
I know this is not the answer, but maybe it helps...
我知道这不是答案,但也许它有帮助......