scala 如何迭代记录火花Scala?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33030726/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to iterate records spark scala?
提问by Rolando
I have a variable "myrdd" that is an avro file with 10 records loaded through hadoopfile.
我有一个变量“myrdd”,它是一个 avro 文件,其中有 10 条记录通过 hadoopfile 加载。
When I do
当我做
myrdd.first_1.datum.getName()
I can get the name. Problem is, I have 10 records in "myrdd". When I do:
我可以得到名字。问题是,我在“myrdd”中有 10 条记录。当我做:
myrdd.map(x => {println(x._1.datum.getName())})
it does not work and prints out a weird object a single time. How can I iterate over all records?
它不起作用并且一次打印出一个奇怪的对象。如何遍历所有记录?
回答by Beryllium
Here is a log from a session using spark-shellwith a similar scenario.
这是来自使用spark-shell类似场景的会话的日志。
Given
给定的
scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]
Your issue looks like
你的问题看起来像
scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]
so mapjust returns another RDD (the function is not applied immediately, the function is applied "lazily" when you really iterate over the result).
所以map只返回另一个 RDD(该函数不会立即应用,当您真正迭代结果时,该函数会“延迟”应用)。
So when you materialize (using collect()) you get a "normal" collection:
因此,当您实现(使用collect())时,您会得到一个“正常”集合:
scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])
over which which you can map. Note that in this case you have a side-effect in the closure passed to map(the println), the result of printlnis Unit):
哪个你可以map。请注意,在这种情况下,传递给map(the println)的闭包会产生副作用,结果println是Unit):
scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())
Same result if collectis applied at the end:
如果collect最后应用,结果相同:
scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())
But if you just want to print the rows, you can simplify it to using foreach:
但是,如果您只想打印行,则可以将其简化为使用foreach:
scala> persons.foreach(t => println(t))
[Justin,19]
As @RohanAletty has pointed out in a comment, this works for a local Spark job. If the job runs in a cluster, collectis required as well:
正如@RohanAletty 在评论中指出的那样,这适用于本地 Spark 工作。如果作业在集群中运行,collect则还需要:
persons.collect().foreach(t => println(t))
Notes
笔记
- The same behaviour can be observed in the
Iteratorclass. - The output of the session above has been reordered
- 在
Iterator类中可以观察到相同的行为。 - 上面会话的输出已重新排序
Update
更新
As for filtering: The location of collectis "bad", if you apply filters after collectwhich can be applied before.
至于过滤: 的位置collect是“坏”,如果您应用过滤器collect,则可以应用之前。
For example these expressions give the same result:
例如,这些表达式给出相同的结果:
scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]
scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]
but the 2nd case is worse, because that filter could have been applied before collect.
但第二种情况更糟,因为该过滤器可能已经应用过collect。
The same applies to any type of aggregation as well.
这同样适用于任何类型的聚合。

