Pandas df.iterrows() 并行化
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/40357434/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Pandas df.iterrows() parallelization
提问by alec_djinn
I would like to parallelize the following code:
我想并行化以下代码:
for row in df.iterrows():
idx = row[0]
k = row[1]['Chromosome']
start,end = row[1]['Bin'].split('-')
sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
df.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
df.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
df.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
I have tried to use multiprocessing.Pool()
since each row can be processed independently, but I can't figure out how to share the DataFrame. I am also not sure that this is the best approach to do parallelization with pandas. Any help?
我尝试使用,multiprocessing.Pool()
因为每一行都可以独立处理,但我不知道如何共享 DataFrame。我也不确定这是与 Pandas 并行化的最佳方法。有什么帮助吗?
回答by TheF1rstPancake
As @Khris said in his comment, you should split up your dataframe into a few large chunks and iterate over each chunk in parallel. You could arbitrarily split the dataframe into randomly sized chunks, but it makes more sense to divide the dataframe into equally sized chunks based on the number of processes you plan on using. Luckily someone else has already figured out how to do that partfor us:
正如@Khris 在他的评论中所说,您应该将数据帧分成几个大块并并行迭代每个块。您可以将数据帧任意拆分为随机大小的块,但根据您计划使用的进程数将数据帧分成大小相等的块更有意义。幸运的是,其他人已经想出了如何为我们做这部分:
# don't forget to import
import pandas as pd
import multiprocessing
# create as many processes as there are CPUs on your machine
num_processes = multiprocessing.cpu_count()
# calculate the chunk size as an integer
chunk_size = int(df.shape[0]/num_processes)
# this solution was reworked from the above link.
# will work even if the length of the dataframe is not evenly divisible by num_processes
chunks = [df.ix[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]
This creates a list that contains our dataframe in chunks. Now we need to pass it into our pool along with a function that will manipulate the data.
这将创建一个列表,其中包含分块的数据帧。现在我们需要将它与将操作数据的函数一起传递到我们的池中。
def func(d):
# let's create a function that squares every value in the dataframe
return d * d
# create our pool with `num_processes` processes
pool = multiprocessing.Pool(processes=num_processes)
# apply our function to each chunk in the list
result = pool.map(func, chunks)
At this point, result
will be a list holding each chunk after it has been manipulated. In this case, all values have been squared. The issue now is that the original dataframe has not been modified, so we have to replace all of its existing values with the results from our pool.
在这一点上,result
将是一个列表,在它被操作后保存每个块。在这种情况下,所有值都已平方。现在的问题是原始数据帧尚未修改,因此我们必须用池中的结果替换其所有现有值。
for i in range(len(result)):
# since result[i] is just a dataframe
# we can reassign the original dataframe based on the index of each chunk
df.ix[result[i].index] = result[i]
Now, my function to manipulate my dataframe is vectorized and would likely have been faster if I had simply applied it to the entirety of my dataframe instead of splitting into chunks. However, in your case, your function would iterate over each row of the each chunk and then return the chunk. This allows you to process num_process
rows at a time.
现在,我操作数据帧的函数是矢量化的,如果我只是将它应用到整个数据帧而不是拆分成块,那么速度可能会更快。但是,在您的情况下,您的函数将遍历每个块的每一行,然后返回该块。这允许您num_process
一次处理行。
def func(d):
for row in d.iterrow():
idx = row[0]
k = row[1]['Chromosome']
start,end = row[1]['Bin'].split('-')
sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
# return the chunk!
return d
Then you reassign the values in the original dataframe, and you have successfully parallelized this process.
然后您重新分配原始数据帧中的值,并且您已经成功地并行了这个过程。
How Many Processes Should I Use?
我应该使用多少个进程?
Your optimal performance is going to depend on the answer to this question. While "ALL OF THE PROCESSES!!!!" is one answer, a better answer is much more nuanced. After a certain point, throwing more processes at a problem actually creates more overhead than it's worth. This is known as Amdahl's Law. Again, we are fortunate that others have already tackled this question for us:
您的最佳表现将取决于对这个问题的回答。而“所有的过程!!!!” 是一个答案,更好的答案是更微妙的。在某个时间点之后,在一个问题上投入更多进程实际上会产生超出其价值的开销。这被称为阿姆达尔定律。同样,我们很幸运,其他人已经为我们解决了这个问题:
A good default is to use multiprocessing.cpu_count()
, which is the default behavior of multiprocessing.Pool
. According to the documentation"If processes is None then the number returned by cpu_count() is used." That's why I set num_processes
at the beginning to multiprocessing.cpu_count()
. This way, if you move to a beefier machine, you get the benefits from it without having to change the num_processes
variable directly.
一个好的默认值是使用multiprocessing.cpu_count()
,这是multiprocessing.Pool
. 根据文档“如果进程为无,则使用 cpu_count() 返回的数字。” 这就是为什么我num_processes
在开始时设置为multiprocessing.cpu_count()
. 这样,如果您使用更强大的机器,您就可以从中受益,而无需num_processes
直接更改变量。
回答by ic_fl2
A faster way (about 10% in my case):
一种更快的方式(在我的情况下约为 10%):
Main differences to accepted answer:
use pd.concat
and np.array_split
to split and join the dataframre.
与公认答案的主要区别:使用pd.concat
和np.array_split
拆分并加入数据帧。
import multiprocessing
import numpy as np
def parallelize_dataframe(df, func):
num_cores = multiprocessing.cpu_count()-1 #leave one free to not freeze machine
num_partitions = num_cores #number of partitions to split dataframe
df_split = np.array_split(df, num_partitions)
pool = multiprocessing.Pool(num_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
where func
is the function you want to apply to df
. Use partial(func, arg=arg_val)
for more that one argument.
func
您要应用的功能在哪里df
。使用partial(func, arg=arg_val)
的时间超过一个参数。
回答by Robert
Consider using dask.dataframe, as e.g. shown in this example for a similar question: https://stackoverflow.com/a/53923034/4340584
考虑使用 dask.dataframe,如本示例中针对类似问题所示:https://stackoverflow.com/a/53923034/4340584
import dask.dataframe as ddf df_dask = ddf.from_pandas(df, npartitions=4) # where the number of partitions is the number of cores you want to use df_dask['output'] = df_dask.apply(lambda x: your_function(x), meta=('str')).compute(scheduler='multiprocessing')
import dask.dataframe as ddf df_dask = ddf.from_pandas(df, npartitions=4) # where the number of partitions is the number of cores you want to use df_dask['output'] = df_dask.apply(lambda x: your_function(x), meta=('str')).compute(scheduler='multiprocessing')