如何在 Pandas Dataframes 上并行化 apply() 以利用一台机器上的所有内核?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/45545110/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How do you parallelize apply() on Pandas Dataframes making use of all cores on one machine?
提问by Roko Mijic
As of August 2017, Pandas DataFame.apply()is unfortunately still limited to working with a single core, meaning that a multi-core machine will waste the majority of its compute-time when you run df.apply(myfunc, axis=1)
.
不幸的是,截至 2017 年 8 月,Pandas DataFame.apply()仍然仅限于使用单核,这意味着多核机器在运行df.apply(myfunc, axis=1)
.
How can you use all your cores to run apply on a dataframe in parallel?
如何使用所有内核并行运行数据帧上的应用程序?
采纳答案by slhck
You may use the swifter
package:
您可以使用以下swifter
软件包:
pip install swifter
It works as a plugin for pandas, allowing you to reuse the apply
function:
它作为 pandas 的插件工作,允许您重用该apply
功能:
import swifter
def some_function(data):
return data * 10
data['out'] = data['in'].swifter.apply(some_function)
It will automatically figure out the most efficient way to parallelize the function, no matter if it's vectorized (as in the above example) or not.
它会自动找出并行化函数的最有效方法,无论它是否矢量化(如上例所示)。
More examplesand a performance comparisonare available on GitHub. Note that the package is under active development, so the API may change.
GitHub 上提供了更多示例和性能比较。请注意,该软件包正在积极开发中,因此 API 可能会发生变化。
Also note that this will not work automaticallyfor string columns. When using strings, Swifter will fallback to a “simple” Pandas apply
, which will not be parallel. In this case, even forcing it to use dask
will not create performance improvements, and you would be better off just splitting your dataset manually and parallelizing using multiprocessing
.
另请注意,这不会自动用于字符串列。使用字符串时,Swifter 将回退到“简单”的 Pandas apply
,它不会是并行的。在这种情况下,即使强制使用它dask
也不会带来性能改进,您最好手动拆分数据集并使用multiprocessing
.
回答by Roko Mijic
The simplest way is to use Dask's map_partitions. You need these imports (you will need to pip install dask
):
最简单的方法是使用Dask 的 map_partitions。您需要这些导入(您将需要pip install dask
):
import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get
and the syntax is
语法是
data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y,z, ...): return <whatever>
res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)
(I believe that 30 is a suitable number of partitions if you have 16 cores). Just for completeness, I timed the difference on my machine (16 cores):
(如果你有 16 个内核,我相信 30 是一个合适的分区数)。为了完整起见,我在我的机器(16 核)上计算了差异:
data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)
ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)
def vectorized(): return myfunc(data['col1'], data['col2'] )
t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))
28.16970546543598
28.16970546543598
t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))
2.708152851089835
2.708152851089835
t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))
0.010668013244867325
0.010668013244867325
Giving a factor of 10 speedupgoing from pandas apply to dask apply on partitions. Of course, if you have a function you can vectorize, you should - in this case the function (y*(x**2+1)
) is trivially vectorized, but there are plenty of things that are impossible to vectorize.
给予10加速的因素,从熊猫去申请DASK适用于分区。当然,如果你有一个可以向量化的函数,你应该 - 在这种情况下,函数 ( y*(x**2+1)
) 被简单地向量化,但有很多东西是不可能向量化的。
回答by G_KOBELIEF
you can try pandarallel
instead: A simple and efficient tool to parallelize your pandas operations on all your CPUs (On Linux & macOS)
你可以试试pandarallel
:一个简单而有效的工具,可以在你的所有 CPU 上并行化你的 Pandas 操作(在 Linux 和 macOS 上)
- Parallelization has a cost (instanciating new processes, sending data via shared memory, etc ...), so parallelization is efficiant only if the amount of calculation to parallelize is high enough. For very little amount of data, using parallezation not always worth it.
- Functions applied should NOT be lambda functions.
- 并行化有成本(实例化新进程、通过共享内存发送数据等),因此只有当并行化的计算量足够高时,并行化才有效。对于很少量的数据,使用并行化并不总是值得的。
- 应用的函数不应是 lambda 函数。
from pandarallel import pandarallel
from math import sin
pandarallel.initialize()
# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)
# ALLOWED
def func(x):
return sin(x**2)
df.parallel_apply(func, axis=1)
回答by Olivier_Cruchant
If you want to stay in native python:
如果你想留在原生 python 中:
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
df['newcol'] = pool.map(f, df['col'])
will apply function f
in a parallel fashion to column col
of dataframe df
将以f
并行方式将函数应用于col
数据框列df
回答by Maxim Balatsko
Here is an example of sklearn base transformer, in which pandas apply is parallelized
这是 sklearn 基础转换器的示例,其中 Pandas apply 是并行化的
import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator
class ParllelTransformer(BaseEstimator, TransformerMixin):
def __init__(self,
n_jobs=1):
"""
n_jobs - parallel jobs to run
"""
self.variety = variety
self.user_abbrevs = user_abbrevs
self.n_jobs = n_jobs
def fit(self, X, y=None):
return self
def transform(self, X, *_):
X_copy = X.copy()
cores = mp.cpu_count()
partitions = 1
if self.n_jobs <= -1:
partitions = cores
elif self.n_jobs <= 0:
partitions = 1
else:
partitions = min(self.n_jobs, cores)
if partitions == 1:
# transform sequentially
return X_copy.apply(self._transform_one)
# splitting data into batches
data_split = np.array_split(X_copy, partitions)
pool = mp.Pool(cores)
# Here reduce function - concationation of transformed batches
data = pd.concat(
pool.map(self._preprocess_part, data_split)
)
pool.close()
pool.join()
return data
def _transform_part(self, df_part):
return df_part.apply(self._transform_one)
def _transform_one(self, line):
# some kind of transformations here
return line
for more info see https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8