pandas python dask DataFrame,是否支持(平凡可并行化)行应用?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31361721/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-13 23:36:43  来源:igfitidea点击:

python dask DataFrame, support for (trivially parallelizable) row apply?

pythonpandasparallel-processingdask

提问by jf328

I recently found daskmodule that aims to be an easy-to-use python parallel processing module. Big selling point for me is that it works with pandas.

我最近发现了dask模块,它旨在成为一个易于使用的 Python 并行处理模块。对我来说最大的卖点是它适用于Pandas。

After reading a bit on its manual page, I can't find a way to do this trivially parallelizable task:

在其手册页上阅读了一些内容后,我找不到一种方法来完成这个微不足道的可并行化任务:

ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply

At the moment, to achieve this in dask, AFAIK,

目前,为了在 dask 中实现这一目标,AFAIK,

ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame

which is ugly syntax and is actually slower than outright

这是丑陋的语法,实际上比完全慢

df.apply(func, axis = 1) # for pandas DF row apply

Any suggestion?

有什么建议吗?

Edit: Thanks @MRocklin for the map function. It seems to be slower than plain pandas apply. Is this related to pandas GIL releasing issue or am I doing it wrong?

编辑:感谢@MRocklin 提供地图功能。它似乎比普通的Pandas应用慢。这与Pandas GIL 发布问题有关还是我做错了?

import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)

def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s

s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec

回答by MRocklin

map_partitions

map_partitions

You can apply your function to all of the partitions of your dataframe with the map_partitionsfunction.

您可以使用该函数将您的函数应用于数据帧的所有分区map_partitions

df.map_partitions(func, columns=...)

Note that func will be given only part of the dataset at a time, not the entire dataset like with pandas apply(which presumably you wouldn't want if you want to do parallelism.)

请注意, func 一次只会给出数据集的一部分,而不是像 with 那样的整个数据集pandas apply(如果你想进行并行化,你可能不想要。)

map/ apply

map/ apply

You can map a function row-wise across a series with map

您可以在一系列中逐行映射函数 map

df.mycolumn.map(func)

You can map a function row-wise across a dataframe with apply

您可以跨数据帧逐行映射函数 apply

df.apply(func, axis=1)

Threads vs Processes

线程与进程

As of version 0.6.0 dask.dataframesparallelizes with threads. Custom Python functions will not receive much benefit from thread-based parallelism. You could try processes instead

从 0.6.0 版本开始,dask.dataframes与线程并行化。自定义 Python 函数不会从基于线程的并行性中获得太多好处。你可以试试流程

df = dd.read_csv(...)

df.map_partitions(func, columns=...).compute(scheduler='processes')

But avoid apply

但要避免 apply

However, you should really avoid applywith custom Python functions, both in Pandas and in Dask. This is often a source of poor performance. It could be that if you find a way to do your operation in a vectorized manner then it could be that your Pandas code will be 100x faster and you won't need dask.dataframe at all.

但是,您真的应该避免apply使用自定义 Python 函数,无论是在 Pandas 还是在 Dask 中。这通常是性能不佳的根源。可能是,如果您找到一种以矢量化方式进行操作的方法,那么您的 Pandas 代码可能会快 100 倍,并且您根本不需要 dask.dataframe。

Consider numba

考虑 numba

For your particular problem you might consider numba. This significantly improves your performance.

对于您的特定问题,您可以考虑numba。这会显着提高您的性能。

In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)

In [4]: %paste
def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s
## -- End pasted text --

In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms

In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)

In [8]: %time _ = s.apply(fast_func)  # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms

In [9]: %time _ = s.apply(fast_func)  # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 μs, total: 68.8 ms
Wall time: 68.7 ms

Disclaimer, I work for the company that makes both numbaand daskand employs many of the pandasdevelopers.

免责声明,我的公司,使双方的工作numba,并dask与员工的许多pandas开发人员。

回答by Shubham Chaudhary

As of v dask.dataframe.apply delegates responsibility to map_partitions:

从 v dask.dataframe.apply 开始,将责任委托给map_partitions

@insert_meta_param_description(pad=12)
def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds):
    """ Parallel version of pandas.Series.apply
    ...
    """
    if meta is no_default:
        msg = ("`meta` is not specified, inferred from partial data. "
               "Please provide `meta` if the result is unexpected.\n"
               "  Before: .apply(func)\n"
               "  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result\n"
               "  or:     .apply(func, meta=('x', 'f8'))            for series result")
        warnings.warn(msg)

        meta = _emulate(M.apply, self._meta_nonempty, func,
                        convert_dtype=convert_dtype,
                        args=args, **kwds)

    return map_partitions(M.apply, self, func,
                          convert_dtype, args, meta=meta, **kwds)