Python 熊猫操作期间的进度指示器

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/18603270/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 11:13:33  来源:igfitidea点击:

Progress indicator during pandas operations

pythonpandasipython

提问by cwharland

I regularly perform pandas operations on data frames in excess of 15 million or so rows and I'd love to have access to a progress indicator for particular operations.

我经常对超过 1500 万行左右的数据帧执行 Pandas 操作,我希望能够访问特定操作的进度指示器。

Does a text based progress indicator for pandas split-apply-combine operations exist?

是否存在用于 Pandas split-apply-combine 操作的基于文本的进度指示器?

For example, in something like:

例如,在类似:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

where feature_rollupis a somewhat involved function that take many DF columns and creates new user columns through various methods. These operations can take a while for large data frames so I'd like to know if it is possible to have text based output in an iPython notebook that updates me on the progress.

wherefeature_rollup是一个有点复杂的函数,它采用许多 DF 列并通过各种方法创建新的用户列。对于大型数据帧,这些操作可能需要一段时间,所以我想知道是否可以在 iPython 笔记本中包含基于文本的输出,以更新我的进度。

So far, I've tried canonical loop progress indicators for Python but they don't interact with pandas in any meaningful way.

到目前为止,我已经尝试了 Python 的规范循环进度指示器,但它们并没有以任何有意义的方式与 Pandas 交互。

I'm hoping there's something I've overlooked in the pandas library/documentation that allows one to know the progress of a split-apply-combine. A simple implementation would maybe look at the total number of data frame subsets upon which the applyfunction is working and report progress as the completed fraction of those subsets.

我希望在 Pandas 库/文档中有一些我忽略的东西,可以让人们知道拆分应用组合的进度。一个简单的实现可能会查看apply函数正在运行的数据帧子集的总数,并将进度报告为这些子集的完成部分。

Is this perhaps something that needs to be added to the library?

这可能是需要添加到库中的东西吗?

采纳答案by casper.dcl

Due to popular demand, tqdmhas added support for pandas. Unlike the other answers, this will not noticeably slow pandas down-- here's an example for DataFrameGroupBy.progress_apply:

由于大众需求,tqdm增加了对pandas. 与其他答案不同,这不会明显减慢熊猫的速度——这是一个例子DataFrameGroupBy.progress_apply

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

In case you're interested in how this works (and how to modify it for your own callbacks), see the examples on github, the full documentation on pypi, or import the module and run help(tqdm).

如果您对它的工作原理感兴趣(以及如何为您自己的回调修改它),请参阅github 上示例pypi完整文档,或导入模块并运行help(tqdm).

EDIT

编辑



To directly answer the original question, replace:

要直接回答原始问题,请替换:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

with:

和:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Note: tqdm <= v4.8: For versions of tqdm below 4.8, instead of tqdm.pandas()you had to do:

注意:tqdm <= v4.8:对于低于 4.8 的 tqdm 版本,tqdm.pandas()您不必执行以下操作:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())

回答by Jeff

You can easily do this with a decorator

您可以使用装饰器轻松完成此操作

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

then just use the modified_function (and change when you want it to print)

然后只需使用 modified_function (并在您想要打印时更改)

回答by Andy Hayden

To tweak Jeff's answer (and have this as a reuseable function).

调整杰夫的答案(并将其作为可重用的功能)。

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('3[D 3[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('3[D 3[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

Note: the apply progress percentage updates inline. If your function stdouts then this won't work.

注意:应用进度百分比更新 inline。如果您的函数 stdouts 那么这将不起作用。

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

As usual you can add this to your groupby objects as a method:

像往常一样,您可以将其作为方法添加到 groupby 对象中:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

As mentioned in the comments, this isn't a feature that core pandas would be interested in implementing. But python allows you to create these for many pandas objects/methods (doing so would be quite a bit of work... although you should be able to generalise this approach).

正如评论中提到的,这不是核心 Pandas 有兴趣实现的功能。但是 Python 允许您为许多 Pandas 对象/方法创建这些(这样做需要做很多工作……尽管您应该能够概括这种方法)。

回答by Filipe Silva

I've changed Jeff's answer, to include a total, so that you can track progress and a variable to just print every X iterations (this actually improves the performance by a lot, if the "print_at" is reasonably high)

我已经改变了杰夫的答案,包括一个总数,这样你就可以跟踪进度和一个变量来只打印每 X 次迭代(如果“print_at”相当高,这实际上可以大大提高性能)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

the clear_output() function is from

clear_output() 函数来自

from IPython.core.display import clear_output

if not on IPython Andy Hayden's answer does that without it

如果不是在 IPython 上 Andy Hayden 的回答没有它

回答by Victor Vulovic

In case you need support for how to use this in a Jupyter/ipython notebook, as I did, here's a helpful guide and source to relevant article:

如果您需要支持如何在 Jupyter/ipython 笔记本中使用它,就像我一样,这里有一个有用的指南和相关文章的来源:

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

Note the underscore in the import statement for _tqdm_notebook. As referenced article mentions, development is in late beta stage.

请注意 import 语句中的下划线_tqdm_notebook。正如参考文章所述,开发处于后期测试阶段。

回答by mork

For anyone who's looking to apply tqdm on their custom parallel pandas-apply code.

对于希望将 tqdm 应用于其自定义并行 Pandas-apply 代码的任何人。

(I tried some of the libraries for parallelization over the years, but I never found a 100% parallelization solution, mainly for the apply function, and I always had to come back for my "manual" code.)

(多年来,我尝试了一些用于并行化的库,但我从未找到 100% 并行化的解决方案,主要是针对 apply 函数,而且我总是不得不回来寻找我的“手动”代码。)

df_multi_core- this is the one you call. It accepts:

df_multi_core- 这就是你所说的。它接受:

  1. Your df object
  2. The function name you'd like to call
  3. The subset of columns the function can be performed upon (helps reducing time / memory)
  4. The number of jobs to run in parallel (-1 or omit for all cores)
  5. Any other kwargs the df's function accepts (like "axis")
  1. 你的 df 对象
  2. 您要调用的函数名称
  3. 可以在其上执行函数的列子集(有助于减少时间/内存)
  4. 并行运行的作业数量(-1 或省略所有内核)
  5. df 函数接受的任何其他 kwargs(如“轴”)

_df_split- this is an internal helper function that has to be positioned globally to the running module (Pool.map is "placement dependent"), otherwise I'd locate it internally..

_df_split- 这是一个内部辅助函数,必须全局定位到正在运行的模块(Pool.map 是“位置相关”),否则我会在内部定位它..

here's the code from my gist(I'll add more pandas function tests there):

这是我的要点中的代码(我将在那里添加更多的熊猫功能测试):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Bellow is a test code for a parallelized applywith tqdm "progress_apply".

Bellow 是使用 tqdm "progress_apply"进行并行化应用的测试代码。

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

In the output you can see 1 progress bar for running without parallelization, and per-core progress bars when running with parallelization. There is a slight hickup and sometimes the rest of the cores appear at once, but even then I think its usefull since you get the progress stats per core (it/sec and total records, for ex)

在输出中,您可以看到 1 个未并行化运行的进度条,以及并行化运行时的每核进度条。有一个轻微的故障,有时其余的核心会立即出现,但即便如此,我认为它很有用,因为您可以获得每个核心的进度统计信息(例如,它/秒和总记录)

enter image description here

在此处输入图片说明

Thank you @abcdaa for this great library!

感谢@abcdaa 为这个伟大的图书馆!