如何在 Spark Scala 中使用 mapPartitions?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/40892080/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use mapPartitions in Spark Scala?
提问by Spar
I have DocsRDD : RDD[String, String]
我有 DocsRDD : RDD[String, String]
val DocsRDD = sc.wholeTextFiles("myDirectory/*" , 2)
DocsRDD:
文档RDD:
Doc1.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc2.txt , bla bla bla .....bla \n bla bla \n bla ... bla
Doc3.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc4.txt , bla bla \n .....\n bla bla bla bla \n ... bla
Is there an efficient, elegant way to extract n-grams from these with mapPartitions? So far i have tried everything, i have read everything i could find at least 5 times over and over about mapPartitions but i still cannot understand how to use it! It seems waaay too difficult to manipulate. In short i want :
是否有一种高效、优雅的方法可以使用 mapPartitions 从这些中提取 n-gram?到目前为止,我已经尝试了所有方法,我已经一遍又一遍地阅读了我能找到的关于 mapPartitions 的所有内容,但我仍然无法理解如何使用它!似乎太难操纵了。总之我想要:
val NGramsRDD = DocsRDD.map(x => (x._1 , x._2.sliding(n) ) )
but efficiently with mapPartitions. My basic misunderstanding of mapPartitions is :
但使用 mapPartitions 很有效。我对 mapPartitions 的基本误解是:
OneDocRDD : RDD[String]
OneDocRDD:RDD[字符串]
val OneDocRDD = sc.textFile("myDoc1.txt" , 2)
.mapPartitions(s1 : Iterator[String] => s2 : Iterator[String])
I Cannot understand this! From when s1 was Iterator[String]? s1 is String after sc.textfile.
我无法理解这!从什么时候 s1 是 Iterator[String]?s1 是 sc.textfile 之后的字符串。
Alright my second question is : Will mapPartitions improve my overcome against map in this situation?
好吧,我的第二个问题是:在这种情况下,mapPartitions 会改善我对地图的克服吗?
Last but not Least important: can f() be :
最后但并非最不重要的: f() 可以是:
f(Iterator[String]) : Iterator[Something else?]
回答by Pascal Soucy
I'm not sure that .mapPartitions will help (at least, not given the example), but using .mapPartitions would look like:
我不确定 .mapPartitions 会有所帮助(至少,没有给出示例),但使用 .mapPartitions 看起来像:
val OneDocRDD = sc.textFile("myDoc1.txt", 2)
.mapPartitions(iter => {
// here you can initialize objects that you would need
// that you want to create once by worker and not for each x in the map.
iter.map(x => (x._1 , x._2.sliding(n)))
})
Normally you want to use .mapPartitions to create/initialize an object you don't want (example: too big) or can't serialize to the worker nodes. Without .mapPartitions you would need to create them in the .map, but that would not be efficient since the object would be created for each x.
通常您想使用 .mapPartitions 来创建/初始化您不想要的对象(例如:太大)或无法序列化到工作节点。如果没有 .mapPartitions,您将需要在 .map 中创建它们,但这效率不高,因为将为每个 x 创建对象。

