Python 熊猫操作期间的进度指示器
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/18603270/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Progress indicator during pandas operations
提问by cwharland
I regularly perform pandas operations on data frames in excess of 15 million or so rows and I'd love to have access to a progress indicator for particular operations.
我经常对超过 1500 万行左右的数据帧执行 Pandas 操作,我希望能够访问特定操作的进度指示器。
Does a text based progress indicator for pandas split-apply-combine operations exist?
是否存在用于 Pandas split-apply-combine 操作的基于文本的进度指示器?
For example, in something like:
例如,在类似:
df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)
where feature_rollup
is a somewhat involved function that take many DF columns and creates new user columns through various methods. These operations can take a while for large data frames so I'd like to know if it is possible to have text based output in an iPython notebook that updates me on the progress.
wherefeature_rollup
是一个有点复杂的函数,它采用许多 DF 列并通过各种方法创建新的用户列。对于大型数据帧,这些操作可能需要一段时间,所以我想知道是否可以在 iPython 笔记本中包含基于文本的输出,以更新我的进度。
So far, I've tried canonical loop progress indicators for Python but they don't interact with pandas in any meaningful way.
到目前为止,我已经尝试了 Python 的规范循环进度指示器,但它们并没有以任何有意义的方式与 Pandas 交互。
I'm hoping there's something I've overlooked in the pandas library/documentation that allows one to know the progress of a split-apply-combine. A simple implementation would maybe look at the total number of data frame subsets upon which the apply
function is working and report progress as the completed fraction of those subsets.
我希望在 Pandas 库/文档中有一些我忽略的东西,可以让人们知道拆分应用组合的进度。一个简单的实现可能会查看apply
函数正在运行的数据帧子集的总数,并将进度报告为这些子集的完成部分。
Is this perhaps something that needs to be added to the library?
这可能是需要添加到库中的东西吗?
采纳答案by casper.dcl
Due to popular demand, tqdm
has added support for pandas
. Unlike the other answers, this will not noticeably slow pandas down-- here's an example for DataFrameGroupBy.progress_apply
:
由于大众需求,tqdm
增加了对pandas
. 与其他答案不同,这不会明显减慢熊猫的速度——这是一个例子DataFrameGroupBy.progress_apply
:
import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm # for notebooks
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()
# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)
In case you're interested in how this works (and how to modify it for your own callbacks), see the examples on github, the full documentation on pypi, or import the module and run help(tqdm)
.
如果您对它的工作原理感兴趣(以及如何为您自己的回调修改它),请参阅github 上的示例、pypi上的完整文档,或导入模块并运行help(tqdm)
.
EDIT
编辑
To directly answer the original question, replace:
要直接回答原始问题,请替换:
df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)
with:
和:
from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)
Note: tqdm <= v4.8:
For versions of tqdm below 4.8, instead of tqdm.pandas()
you had to do:
注意:tqdm <= v4.8:对于低于 4.8 的 tqdm 版本,tqdm.pandas()
您不必执行以下操作:
from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
回答by Jeff
You can easily do this with a decorator
您可以使用装饰器轻松完成此操作
from functools import wraps
def logging_decorator(func):
@wraps
def wrapper(*args, **kwargs):
wrapper.count += 1
print "The function I modify has been called {0} times(s).".format(
wrapper.count)
func(*args, **kwargs)
wrapper.count = 0
return wrapper
modified_function = logging_decorator(feature_rollup)
then just use the modified_function (and change when you want it to print)
然后只需使用 modified_function (并在您想要打印时更改)
回答by Andy Hayden
To tweak Jeff's answer (and have this as a reuseable function).
调整杰夫的答案(并将其作为可重用的功能)。
def logged_apply(g, func, *args, **kwargs):
step_percentage = 100. / len(g)
import sys
sys.stdout.write('apply progress: 0%')
sys.stdout.flush()
def logging_decorator(func):
def wrapper(*args, **kwargs):
progress = wrapper.count * step_percentage
sys.stdout.write('3[D 3[D' * 4 + format(progress, '3.0f') + '%')
sys.stdout.flush()
wrapper.count += 1
return func(*args, **kwargs)
wrapper.count = 0
return wrapper
logged_func = logging_decorator(func)
res = g.apply(logged_func, *args, **kwargs)
sys.stdout.write('3[D 3[D' * 4 + format(100., '3.0f') + '%' + '\n')
sys.stdout.flush()
return res
Note: the apply progress percentage updates inline. If your function stdouts then this won't work.
注意:应用进度百分比更新 inline。如果您的函数 stdouts 那么这将不起作用。
In [11]: g = df_users.groupby(['userID', 'requestDate'])
In [12]: f = feature_rollup
In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]:
...
As usual you can add this to your groupby objects as a method:
像往常一样,您可以将其作为方法添加到 groupby 对象中:
from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply
In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]:
...
As mentioned in the comments, this isn't a feature that core pandas would be interested in implementing. But python allows you to create these for many pandas objects/methods (doing so would be quite a bit of work... although you should be able to generalise this approach).
正如评论中提到的,这不是核心 Pandas 有兴趣实现的功能。但是 Python 允许您为许多 Pandas 对象/方法创建这些(这样做需要做很多工作……尽管您应该能够概括这种方法)。
回答by Filipe Silva
I've changed Jeff's answer, to include a total, so that you can track progress and a variable to just print every X iterations (this actually improves the performance by a lot, if the "print_at" is reasonably high)
我已经改变了杰夫的答案,包括一个总数,这样你就可以跟踪进度和一个变量来只打印每 X 次迭代(如果“print_at”相当高,这实际上可以大大提高性能)
def count_wrapper(func,total, print_at):
def wrapper(*args):
wrapper.count += 1
if wrapper.count % wrapper.print_at == 0:
clear_output()
sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
sys.stdout.flush()
return func(*args)
wrapper.count = 0
wrapper.total = total
wrapper.print_at = print_at
return wrapper
the clear_output() function is from
clear_output() 函数来自
from IPython.core.display import clear_output
if not on IPython Andy Hayden's answer does that without it
如果不是在 IPython 上 Andy Hayden 的回答没有它
回答by Victor Vulovic
In case you need support for how to use this in a Jupyter/ipython notebook, as I did, here's a helpful guide and source to relevant article:
如果您需要支持如何在 Jupyter/ipython 笔记本中使用它,就像我一样,这里有一个有用的指南和相关文章的来源:
from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)
Note the underscore in the import statement for _tqdm_notebook
. As referenced article mentions, development is in late beta stage.
请注意 import 语句中的下划线_tqdm_notebook
。正如参考文章所述,开发处于后期测试阶段。
回答by mork
For anyone who's looking to apply tqdm on their custom parallel pandas-apply code.
对于希望将 tqdm 应用于其自定义并行 Pandas-apply 代码的任何人。
(I tried some of the libraries for parallelization over the years, but I never found a 100% parallelization solution, mainly for the apply function, and I always had to come back for my "manual" code.)
(多年来,我尝试了一些用于并行化的库,但我从未找到 100% 并行化的解决方案,主要是针对 apply 函数,而且我总是不得不回来寻找我的“手动”代码。)
df_multi_core- this is the one you call. It accepts:
df_multi_core- 这就是你所说的。它接受:
- Your df object
- The function name you'd like to call
- The subset of columns the function can be performed upon (helps reducing time / memory)
- The number of jobs to run in parallel (-1 or omit for all cores)
- Any other kwargs the df's function accepts (like "axis")
- 你的 df 对象
- 您要调用的函数名称
- 可以在其上执行函数的列子集(有助于减少时间/内存)
- 并行运行的作业数量(-1 或省略所有内核)
- df 函数接受的任何其他 kwargs(如“轴”)
_df_split- this is an internal helper function that has to be positioned globally to the running module (Pool.map is "placement dependent"), otherwise I'd locate it internally..
_df_split- 这是一个内部辅助函数,必须全局定位到正在运行的模块(Pool.map 是“位置相关”),否则我会在内部定位它..
here's the code from my gist(I'll add more pandas function tests there):
这是我的要点中的代码(我将在那里添加更多的熊猫功能测试):
import pandas as pd
import numpy as np
import multiprocessing
from functools import partial
def _df_split(tup_arg, **kwargs):
split_ind, df_split, df_f_name = tup_arg
return (split_ind, getattr(df_split, df_f_name)(**kwargs))
def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
if njobs == -1:
njobs = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=njobs)
try:
splits = np.array_split(df[subset], njobs)
except ValueError:
splits = np.array_split(df, njobs)
pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
results = pool.map(partial(_df_split, **kwargs), pool_data)
pool.close()
pool.join()
results = sorted(results, key=lambda x:x[0])
results = pd.concat([split[1] for split in results])
return results
Bellow is a test code for a parallelized applywith tqdm "progress_apply".
Bellow 是使用 tqdm "progress_apply"进行并行化应用的测试代码。
from time import time
from tqdm import tqdm
tqdm.pandas()
if __name__ == '__main__':
sep = '-' * 50
# tqdm progress_apply test
def apply_f(row):
return row['c1'] + 0.1
N = 1000000
np.random.seed(0)
df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})
print('testing pandas apply on {}\n{}'.format(df.shape, sep))
t1 = time()
res = df.progress_apply(apply_f, axis=1)
t2 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))
t3 = time()
# res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
t4 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))
In the output you can see 1 progress bar for running without parallelization, and per-core progress bars when running with parallelization. There is a slight hickup and sometimes the rest of the cores appear at once, but even then I think its usefull since you get the progress stats per core (it/sec and total records, for ex)
在输出中,您可以看到 1 个未并行化运行的进度条,以及并行化运行时的每核进度条。有一个轻微的故障,有时其余的核心会立即出现,但即便如此,我认为它很有用,因为您可以获得每个核心的进度统计信息(例如,它/秒和总记录)
Thank you @abcdaa for this great library!
感谢@abcdaa 为这个伟大的图书馆!