pandas 多处理写入熊猫数据帧
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33550312/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Multiprocessing writing to pandas dataframe
提问by user3374113
So what I am trying to do with the following code is to read a list of lists and put them through function called checker
and then have log_result
deal with the result of the function checker
. I am trying to do this using multithreading because the variable name rows_to_parse
in reality has millions of rows, so using multiple cores should speed up this process by a considerable amount.
所以我试图用下面的代码做的是读取一个列表列表并将它们放入调用的函数中checker
,然后log_result
处理函数的结果checker
。我正在尝试使用多线程来执行此操作,因为变量名称rows_to_parse
实际上有数百万行,因此使用多核应该会大大加快此过程。
The code at present moment doesn't work and crashes Python.
目前的代码不起作用并导致 Python 崩溃。
Concerns and Issues I have:
我的顾虑和问题:
- Want the existing df which held in the variable
df
to maintain the index throughout process because otherwiselog_result
will get confused as to which row needs updating. - I am quite certain that
apply_async
is not the appropriate multiprocessing function to perform this duty because I believe the order at which the computer reads and writes the df can possibly corrupt it??? - I think that a queue may need to be set up to write and read
df
but I am unsure as to how I would go about doing that.
- 希望保存在变量中的现有 df 在
df
整个过程中维护索引,否则log_result
会对哪一行需要更新感到困惑。 - 我很确定这
apply_async
不是执行此任务的适当多处理功能,因为我相信计算机读取和写入 df 的顺序可能会损坏它??? - 我认为可能需要设置一个队列来写入和读取,
df
但我不确定我将如何去做。
Thank you for any assistance.
感谢您提供任何帮助。
import pandas as pd
import multiprocessing
from functools import partial
def checker(a,b,c,d,e):
match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)]
index_of_match = match.index.tolist()
if len(index_of_match) == 1: #one match in df
return index_of_match
elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__:
return [index_of_match[0]]
else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df
return [a,b,c,d,e]
def log_result(result, dataf):
if len(result) == 1: #
dataf.loc[result[0]]['e'] += 1
else: #append new row to exisiting df
new_row = pd.DataFrame([result],columns=cols)
dataf = dataf.append(new_row,ignore_index=True)
def apply_async_with_callback(parsing_material, dfr):
pool = multiprocessing.Pool()
for var_a, var_b, var_c, var_d, var_e in parsing_material:
pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr))
pool.close()
pool.join()
if __name__ == '__main__':
#setting up main dataframe
cols = ['a','b','c','d','e']
existing_data = [["YES","A","16052011","13031999",3],
["NO","Q","11022003","15081999",3],
["YES","A","22082010","03012001",9]]
#main dataframe
df = pd.DataFrame(existing_data,columns=cols)
#new data
rows_to_parse = [['NO', 'A', '09061997', '06122003', 5],
['YES', 'W', '17061992', '26032012', 6],
['YES', 'G', '01122006', '07082014', 2],
['YES', 'N', '06081992', '21052008', 9],
['YES', 'Y', '18051995', '24011996', 6],
['NO', 'Q', '11022003', '15081999', 3],
['NO', 'O', '20112004', '28062008', 0],
['YES', 'R', '10071994', '03091996', 8],
['NO', 'C', '09091998', '22051992', 1],
['YES', 'Q', '01051995', '02012000', 3],
['YES', 'Q', '26022015', '26092007', 5],
['NO', 'F', '15072002', '17062001', 8],
['YES', 'I', '24092006', '03112003', 2],
['YES', 'A', '22082010', '03012001', 9],
['YES', 'I', '15072016', '30092005', 7],
['YES', 'Y', '08111999', '02022006', 3],
['NO', 'V', '04012016', '10061996', 1],
['NO', 'I', '21012003', '11022001', 6],
['NO', 'P', '06041992', '30111993', 6],
['NO', 'W', '30081992', '02012016', 6]]
apply_async_with_callback(rows_to_parse, df)
回答by Andy Hayden
Updating DataFrames like this in MultiProcessing isn't going to work:
在 MultiProcessing 中像这样更新 DataFrame 是行不通的:
dataf = dataf.append(new_row,ignore_index=True)
For one thing this is very inefficient (O(n) for each append so O(n^2) in total. The preferred way is to concat some objects together in one pass.
一方面,这是非常低效的(每个追加的 O(n) 所以总共 O(n^2)。首选的方法是一次将一些对象连接在一起。
For another, and more importantly, dataf is not locking for each update, so there's no guarantee that two operations won't conflict (I'm guessing this is crashing python).
另一方面,更重要的是,dataf 不会为每次更新锁定,因此不能保证两个操作不会发生冲突(我猜这会导致 python 崩溃)。
Finally, append doesn't act in place, so the variable dataf
is discarded once the callback is finished!! and no changes are made to the parent dataf
.
最后, append 并没有就位,所以dataf
一旦回调完成,变量就会被丢弃!!并且没有对 parent 进行任何更改dataf
。
We could use MultiProcessing listor a dict. list if you don't care about order or dict if you do (e.g. enumerate), as you must note that the values are returned not in a well-defined order from async.
(or we could create an object which implements Lock ourselves, see Eli Bendersky.)
So the following changes are made:
我们可以使用MultiProcessing list或dict。list 如果您不关心 order 或 dict 如果您这样做(例如枚举),因为您必须注意这些值不是按照 async 明确定义的顺序返回的。
(或者我们可以创建一个自己实现 Lock 的对象,请参阅Eli Bendersky。)
因此进行了以下更改:
df = pd.DataFrame(existing_data,columns=cols)
# becomes
df = pd.DataFrame(existing_data,columns=cols)
d = MultiProcessing.list([df])
dataf = dataf.append(new_row,ignore_index=True)
# becomes
d.append(new_row)
Now, once the async has finished you have a MultiProcessing.list of DataFrames. You can concat these (and ignore_index
) to get the desired result:
现在,一旦异步完成,你就有了一个 DataFrames 的 MultiProcessing.list。您可以连接这些(和ignore_index
)以获得所需的结果:
pd.concat(d, ignore_index=True)
Should do the trick.
应该做的伎俩。
Note: creating the newrow DataFrame at each stage is also less efficient that letting pandas parse the list of lists directly to a DataFrame in one go. Hopefully this is a toy example, really you want your chunks to be quite large to get wins with MultiProcessing (I've heard 50kb as a rule-of-thumb...), a row at a time is never going to be a win here.
注意:在每个阶段创建 newrow DataFrame 的效率也低于让 Pandas 一次性将列表列表直接解析为 DataFrame 的效率。希望这是一个玩具示例,您真的希望您的块非常大以便通过 MultiProcessing 获胜(我听说 50kb 作为经验法则......),一次一行永远不会成为一个赢在这里。
Aside: You should avoid using globals (like df) in your code, it's much cleaner to pass them around in your functions (in this case, as an argument to checker).
旁白:你应该避免在你的代码中使用全局变量(比如 df),在你的函数中传递它们会更干净(在这种情况下,作为检查器的参数)。