pandas 多处理写入熊猫数据帧

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33550312/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-14 00:10:53  来源:igfitidea点击:

Multiprocessing writing to pandas dataframe

pythonmultithreadingpython-2.7pandasmultiprocessing

提问by user3374113

So what I am trying to do with the following code is to read a list of lists and put them through function called checkerand then have log_resultdeal with the result of the function checker. I am trying to do this using multithreading because the variable name rows_to_parsein reality has millions of rows, so using multiple cores should speed up this process by a considerable amount.

所以我试图用下面的代码做的是读取一个列表列表并将它们放入调用的函数中checker,然后log_result处理函数的结果checker。我正在尝试使用多线程来执行此操作,因为变量名称rows_to_parse实际上有数百万行,因此使用多核应该会大大加快此过程。

The code at present moment doesn't work and crashes Python.

目前的代码不起作用并导致 Python 崩溃。

Concerns and Issues I have:

我的顾虑和问题:

  1. Want the existing df which held in the variable dfto maintain the index throughout process because otherwise log_resultwill get confused as to which row needs updating.
  2. I am quite certain that apply_asyncis not the appropriate multiprocessing function to perform this duty because I believe the order at which the computer reads and writes the df can possibly corrupt it???
  3. I think that a queue may need to be set up to write and read dfbut I am unsure as to how I would go about doing that.
  1. 希望保存在变量中的现有 df 在df整个过程中维护索引,否则log_result会对哪一行需要更新感到困惑。
  2. 我很确定这apply_async不是执行此任务的适当多处理功能,因为我相信计算机读取和写入 df 的顺序可能会损坏它???
  3. 我认为可能需要设置一个队列来写入和读取,df但我不确定我将如何去做。

Thank you for any assistance.

感谢您提供任何帮助。

import pandas as pd
import multiprocessing
from functools import partial

def checker(a,b,c,d,e):
    match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)]
    index_of_match = match.index.tolist()
    if len(index_of_match) == 1: #one match in df
        return index_of_match
    elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__:
        return [index_of_match[0]]
    else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df
        return [a,b,c,d,e]



def log_result(result, dataf):
    if len(result) == 1: #
        dataf.loc[result[0]]['e'] += 1 
    else: #append new row to exisiting df
        new_row = pd.DataFrame([result],columns=cols)
        dataf = dataf.append(new_row,ignore_index=True)


def apply_async_with_callback(parsing_material, dfr):
    pool = multiprocessing.Pool()
    for var_a, var_b, var_c, var_d, var_e in parsing_material:
        pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr))
    pool.close()
    pool.join()



if __name__ == '__main__':
    #setting up main dataframe
    cols = ['a','b','c','d','e']
    existing_data = [["YES","A","16052011","13031999",3],
                    ["NO","Q","11022003","15081999",3],
                    ["YES","A","22082010","03012001",9]]

    #main dataframe
    df = pd.DataFrame(existing_data,columns=cols)

    #new data
    rows_to_parse = [['NO', 'A', '09061997', '06122003', 5],
                    ['YES', 'W', '17061992', '26032012', 6],
                    ['YES', 'G', '01122006', '07082014', 2],
                    ['YES', 'N', '06081992', '21052008', 9],
                    ['YES', 'Y', '18051995', '24011996', 6],
                    ['NO', 'Q', '11022003', '15081999', 3],
                    ['NO', 'O', '20112004', '28062008', 0],
                    ['YES', 'R', '10071994', '03091996', 8],
                    ['NO', 'C', '09091998', '22051992', 1],
                    ['YES', 'Q', '01051995', '02012000', 3],
                    ['YES', 'Q', '26022015', '26092007', 5],
                    ['NO', 'F', '15072002', '17062001', 8],
                    ['YES', 'I', '24092006', '03112003', 2],
                    ['YES', 'A', '22082010', '03012001', 9],
                    ['YES', 'I', '15072016', '30092005', 7],
                    ['YES', 'Y', '08111999', '02022006', 3],
                    ['NO', 'V', '04012016', '10061996', 1],
                    ['NO', 'I', '21012003', '11022001', 6],
                    ['NO', 'P', '06041992', '30111993', 6],
                    ['NO', 'W', '30081992', '02012016', 6]]


    apply_async_with_callback(rows_to_parse, df)

回答by Andy Hayden

Updating DataFrames like this in MultiProcessing isn't going to work:

在 MultiProcessing 中像这样更新 DataFrame 是行不通的:

dataf = dataf.append(new_row,ignore_index=True)

For one thing this is very inefficient (O(n) for each append so O(n^2) in total. The preferred way is to concat some objects together in one pass.

一方面,这是非常低效的(每个追加的 O(n) 所以总共 O(n^2)。首选的方法是一次将一些对象连接在一起。

For another, and more importantly, dataf is not locking for each update, so there's no guarantee that two operations won't conflict (I'm guessing this is crashing python).

另一方面,更重要的是,dataf 不会为每次更新锁定,因此不能保证两个操作不会发生冲突(我猜这会导致 python 崩溃)。

Finally, append doesn't act in place, so the variable datafis discarded once the callback is finished!! and no changes are made to the parent dataf.

最后, append 并没有就位,所以dataf一旦回调完成,变量就会被丢弃!!并且没有对 parent 进行任何更改dataf



We could use MultiProcessing listor a dict. list if you don't care about order or dict if you do (e.g. enumerate), as you must note that the values are returned not in a well-defined order from async.
(or we could create an object which implements Lock ourselves, see Eli Bendersky.)
So the following changes are made:

我们可以使用MultiProcessing listdict。list 如果您不关心 order 或 dict 如果您这样做(例如枚举),因为您必须注意这些值不是按照 async 明确定义的顺序返回的。
(或者我们可以创建一个自己实现 Lock 的对象,请参阅Eli Bendersky。)
因此进行了以下更改:

df = pd.DataFrame(existing_data,columns=cols)
# becomes
df = pd.DataFrame(existing_data,columns=cols)
d = MultiProcessing.list([df])

dataf = dataf.append(new_row,ignore_index=True)
# becomes
d.append(new_row)

Now, once the async has finished you have a MultiProcessing.list of DataFrames. You can concat these (and ignore_index) to get the desired result:

现在,一旦异步完成,你就有了一个 DataFrames 的 MultiProcessing.list。您可以连接这些(和ignore_index)以获得所需的结果:

pd.concat(d, ignore_index=True)

Should do the trick.

应该做的伎俩。



Note: creating the newrow DataFrame at each stage is also less efficient that letting pandas parse the list of lists directly to a DataFrame in one go. Hopefully this is a toy example, really you want your chunks to be quite large to get wins with MultiProcessing (I've heard 50kb as a rule-of-thumb...), a row at a time is never going to be a win here.

注意:在每个阶段创建 newrow DataFrame 的效率也低于让 Pandas 一次性将列表列表直接解析为 DataFrame 的效率。希望这是一个玩具示例,您真的希望您的块非常大以便通过 MultiProcessing 获胜(我听说 50kb 作为经验法则......),一次一行永远不会成为一个赢在这里。



Aside: You should avoid using globals (like df) in your code, it's much cleaner to pass them around in your functions (in this case, as an argument to checker).

旁白:你应该避免在你的代码中使用全局变量(比如 df),在你的函数中传递它们会更干净(在这种情况下,作为检查器的参数)。