如何在 apache spark (scala) 中迭代 RDD
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/25914789/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How do I iterate RDD's in apache spark (scala)
提问by Havnar
I use the following command to fill an RDD with a bunch of arrays containing 2 strings ["filename", "content"].
我使用以下命令用一堆包含 2 个字符串 ["filename", "content"] 的数组填充 RDD。
Now I want to iterate over every of those occurrences to do something with every filename and content.
现在我想遍历所有这些事件,对每个文件名和内容做一些事情。
val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")
I can't seem to find any documentation on how to do this however.
但是,我似乎找不到任何有关如何执行此操作的文档。
So what I want is this:
所以我想要的是这个:
foreach occurrence-in-the-rdd{
//do stuff with the array found on loccation n of the RDD
}
采纳答案by David
The fundamental operations in Spark are mapand filter.
Spark 中的基本操作是map和filter。
val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }
the txtRDDwill now only contain files that have the extension ".txt"
在txtRDD现在将只包含具有扩展名为“.txt”文件
And if you want to word count those files you can say
如果你想对这些文件进行字数统计,你可以说
//split the documents into words in one long list
val words = txtRDD flatMap { case (id,text) => text.split("\s+") }
// give each word a count of 1
val wordT = words map (x => (x,1))
//sum up the counts for each word
val wordCount = wordsT reduceByKey((a, b) => a + b)
You want to use mapPartitionswhen you have some expensive initialization you need to perform -- for example, if you want to do Named Entity Recognition with a library like the Stanford coreNLP tools.
mapPartitions当您需要执行一些昂贵的初始化时,您想使用它——例如,如果您想使用斯坦福 coreNLP 工具等库进行命名实体识别。
Master map, filter, flatMap, and reduce, and you are well on your way to mastering Spark.
掌握map, filter, flatMap, 和reduce, 并且您已经掌握了 Spark。
回答by Spiro Michaylov
You call various methods on the RDD that accept functions as parameters.
您可以在 RDD 上调用各种接受函数作为参数的方法。
// set up an example -- an RDD of arrays
val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
val sc = new SparkContext(sparkConf)
val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
val testRDD = sc.parallelize(testData, 2)
// Print the RDD of arrays.
testRDD.collect().foreach(a => println(a.size))
// Use map() to create an RDD with the array sizes.
val countRDD = testRDD.map(a => a.size)
// Print the elements of this new RDD.
countRDD.collect().foreach(a => println(a))
// Use filter() to create an RDD with just the longer arrays.
val bigRDD = testRDD.filter(a => a.size > 3)
// Print each remaining array.
bigRDD.collect().foreach(a => {
a.foreach(e => print(e + " "))
println()
})
}
Notice that the functions you write accept a single RDD element as input, and return data of some uniform type, so you create an RDD of the latter type. For example, countRDDis an RDD[Int], while bigRDDis still an RDD[Array[Int]].
请注意,您编写的函数接受单个 RDD 元素作为输入,并返回某种统一类型的数据,因此您创建了后一种类型的 RDD。例如,countRDD是一个RDD[Int],而bigRDD仍然是一个RDD[Array[Int]]。
It will probably be tempting at some point to write a foreachthat modifies some other data, but you should resist for reasons described in this question and answer.
在某些时候编写foreach修改一些其他数据的a 可能很诱人,但您应该出于本问答中描述的原因而拒绝。
Edit: Don't try to print large RDDs
编辑:不要尝试打印大RDDs
Several readers have asked about using collect()and println()to see their results, as in the example above. Of course, this only works if you're running in an interactive mode like the Spark REPL (read-eval-print-loop.) It's best to call collect()on the RDD to get a sequential array for orderly printing. But collect()may bring back too much data and in any case too much may be printed. Here are some alternative ways to get insight into your RDDs if they're large:
一些读者询问了如何使用collect()并println()查看他们的结果,如上例所示。当然,这仅在您以 Spark REPL(读取-评估-打印-循环)等交互模式运行时才有效。最好调用collect()RDD 以获取顺序数组以进行有序打印。但collect()可能会带回太多数据,而且无论如何可能会打印太多。RDD如果它们很大,这里有一些替代方法可以深入了解您的s:
RDD.take(): This gives you fine control on the number of elements you get but not where they came from -- defined as the "first" ones which is a concept dealt with by various other questions and answers here.// take() returns an Array so no need to collect() myHugeRDD.take(20).foreach(a => println(a))RDD.sample(): This lets you (roughly) control the fraction of results you get, whether sampling uses replacement, and even optionally the random number seed.// sample() does return an RDD so you may still want to collect() myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))RDD.takeSample(): This is a hybrid: using random sampling that you can control, but both letting you specify the exact number of results and returning anArray.// takeSample() returns an Array so no need to collect() myHugeRDD.takeSample(true, 20).foreach(a => println(a))RDD.count(): Sometimes the best insight comes from how many elements you ended up with -- I often do this first.println(myHugeRDD.count())
RDD.take():这使您可以很好地控制获得的元素数量,但不能控制它们的来源——定义为“第一个”元素,这是一个在这里由各种其他问题和答案处理的概念。// take() returns an Array so no need to collect() myHugeRDD.take(20).foreach(a => println(a))RDD.sample():这可以让您(大致)控制您获得的结果的比例,采样是否使用替换,甚至可以选择随机数种子。// sample() does return an RDD so you may still want to collect() myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))RDD.takeSample():这是一个混合体:使用您可以控制的随机抽样,但都可以让您指定结果的确切数量并返回一个Array.// takeSample() returns an Array so no need to collect() myHugeRDD.takeSample(true, 20).foreach(a => println(a))RDD.count():有时最好的洞察力来自你最终得到的元素数量——我经常先这样做。println(myHugeRDD.count())
回答by Mikel Urkia
I would try making use of a partition mapping function. The code below shows how an entire RDD dataset can be processed in a loop so that each input goes through the very same function. I am afraid I have no knowledge about Scala, so everything I have to offer is javacode. However, it should not be very difficult to translate it into scala.
我会尝试使用分区映射功能。下面的代码显示了如何在循环中处理整个 RDD 数据集,以便每个输入都通过完全相同的函数。恐怕我对Scala一无所知,所以我必须提供的一切都是java代码。不过,把它翻译成scala应该不是很困难。
JavaRDD<String> res = file.mapPartitions(new FlatMapFunction <Iterator<String> ,String>(){
@Override
public Iterable<String> call(Iterator <String> t) throws Exception {
ArrayList<String[]> tmpRes = new ArrayList <>();
String[] fillData = new String[2];
fillData[0] = "filename";
fillData[1] = "content";
while(t.hasNext()){
tmpRes.add(fillData);
}
return Arrays.asList(tmpRes);
}
}).cache();
回答by leon
what the wholeTextFilesreturn is a Pair RDD:
什么wholeTextFiles回报是一对RDD:
def wholeTextFiles(path: String, minPartitions: Int): RDD[(String, String)]
Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.
def WholeTextFiles(path: String, minPartitions: Int): RDD[(String, String)]
从 HDFS、本地文件系统(在所有节点上可用)或任何 Hadoop 支持的文件系统 URI 中读取文本文件目录。每个文件被读取为一条记录,并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。
Here is an example of reading the files at a local path then printing every filename and content.
这是在本地路径读取文件然后打印每个文件名和内容的示例。
val conf = new SparkConf().setAppName("scala-test").setMaster("local")
val sc = new SparkContext(conf)
sc.wholeTextFiles("file:///Users/leon/Documents/test/")
.collect
.foreach(t => println(t._1 + ":" + t._2));
the result:
结果:
file:/Users/leon/Documents/test/1.txt:{"name":"tom","age":12}
file:/Users/leon/Documents/test/2.txt:{"name":"john","age":22}
file:/Users/leon/Documents/test/3.txt:{"name":"leon","age":18}
or converting the Pair RDD to a RDD first
或先将 Pair RDD 转换为 RDD
sc.wholeTextFiles("file:///Users/leon/Documents/test/")
.map(t => t._2)
.collect
.foreach { x => println(x)}
the result:
结果:
{"name":"tom","age":12}
{"name":"john","age":22}
{"name":"leon","age":18}
And I think wholeTextFilesis more compliant for small files.
而且我认为wholeTextFiles更适合小文件。
回答by AliSafari186
for (element <- YourRDD)
{
// do what you want with element in each iteration, and if you want the index of element, simply use a counter variable in this loop beginning from 0
println (element._1) // this will print all filenames
}

