Python pyspark mapPartitions 函数是如何工作的?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/26741714/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How does the pyspark mapPartitions function work?
提问by MetallicPriest
So I am trying to learn Spark using Python (Pyspark). I want to know how the function mapPartitionswork. That is what Input it takes and what Output it gives. I couldn't find any proper example from the internet. Lets say, I have an RDD object containing lists, such as below.
所以我正在尝试使用 Python (Pyspark) 学习 Spark。我想知道这个函数是如何mapPartitions工作的。这就是它需要的输入和它给出的输出。我无法从互联网上找到任何合适的例子。可以说,我有一个包含列表的 RDD 对象,如下所示。
[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ]
And I want to remove element 2 from all the lists, how would I achieve that using mapPartitions.
我想从所有列表中删除元素 2,我将如何使用mapPartitions.
采纳答案by bearrito
mapPartition should be thought of as a map operation over partitions and not over the elements of the partition. It's input is the set of current partitions its output will be another set of partitions.
mapPartition 应该被认为是对分区而不是对分区元素的映射操作。它的输入是当前分区的集合,它的输出将是另一组分区。
The function you pass map must take an individual element of your RDD
您传递 map 的函数必须采用 RDD 的单个元素
The function you pass mapPartition must take an iterable of your RDD type and return and iterable of some other or the same type.
您传递 mapPartition 的函数必须采用您的 RDD 类型的可迭代对象,并返回其他或相同类型的可迭代对象。
In your case you probably just want to do something like
在您的情况下,您可能只想做类似的事情
def filter_out_2(line):
return [x for x in line if x != 2]
filtered_lists = data.map(filterOut2)
if you wanted to use mapPartition it would be
如果你想使用 mapPartition 那就是
def filter_out_2_from_partition(list_of_lists):
final_iterator = []
for sub_list in list_of_lists:
final_iterator.append( [x for x in sub_list if x != 2])
return iter(final_iterator)
filtered_lists = data.mapPartition(filterOut2FromPartion)
回答by Narek
It's easier to use mapPartitions with a generator function using the yieldsyntax:
使用以下yield语法将 mapPartitions 与生成器函数一起使用会更容易:
def filter_out_2(partition):
for element in partition:
if element != 2:
yield element
filtered_lists = data.mapPartitions(filter_out_2)
回答by Ravinder Karra
Need a final Iter
需要一个最终的 Iter
def filter_out_2(partition):
for element in partition:
sec_iterator = []
for i in element:
if i!= 2:
sec_iterator.append(i)
yield sec_iterator
filtered_lists = data.mapPartitions(filter_out_2)
for i in filtered_lists.collect(): print(i)
回答by madhu chilukuri
def func(l):
for i in l:
yield i+"ajbf"
mylist=['madhu','sdgs','sjhf','mad']
rdd=sc.parallelize(mylist)
t=rdd.mapPartitions(func)
for i in t.collect():
print(i)
for i in t.collect():
print(i)
in the above code I am able get data from 2nd for..in loop.. as per generator it should not should values once its iterate over the loop
在上面的代码中,我能够从第二个 for..in 循环中获取数据.. 根据生成器,一旦它在循环中迭代,它就不应该取值

