list Spark:要列出的 RDD

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/40892800/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-09 08:11:59  来源:igfitidea点击:

Spark: RDD to List

scalalistapache-sparkrdd

提问by bill

I have a RDD structure

我有一个 RDD 结构

RDD[(String, String)]

and I want to create 2 Lists (one for each dimension of the rdd).

我想创建 2 个列表(一个用于 rdd 的每个维度)。

I tried to use the rdd.foreach() and fill two ListBuffers and then convert them to Lists, but I guess each node creates its own ListBuffer because after the iteration the BufferLists are empty. How can I do it ?

我尝试使用 rdd.foreach() 并填充两个 ListBuffers,然后将它们转换为 Lists,但我猜每个节点都会创建自己的 ListBuffer,因为迭代后 BufferLists 为空。我该怎么做 ?

EDIT : my approach

编辑:我的方法

val labeled = data_labeled.map { line =>
  val parts = line.split(',')
  (parts(5), parts(7))
}.cache()

var testList : ListBuffer[String] = new ListBuffer()

labeled.foreach(line =>
  testList += line._1
)
  val labeledList = testList.toList
  println("rdd: " + labeled.count)
  println("bufferList: " + testList.size)
  println("list: " + labeledList.size)

and the result is:

结果是:

rdd: 31990654
bufferList: 0
list: 0

回答by Tzach Zohar

If you really want to create two Lists- meaning, you want all the distributed data to be collected into the driver application (risking slowness or OutOfMemoryError) - you can use collectand then use simple mapoperations on the result:

如果你真的想创建两个列表——这意味着你希望将所有分布式数据收集到驱动程序应用程序中(冒着缓慢或 的风险OutOfMemoryError)——你可以使用collect然后map对结果使用简单的操作:

val list: List[(String, String)] = rdd.collect().toList
val col1: List[String] = list.map(_._1)
val col2: List[String] = list.map(_._2)

Alternatively - if you want to "split" your RDD into two RDDs- it's pretty similar without collecting the data:

或者-如果你想“分”你的RDD分为两个RDDS-这是没有收集数据非常相似:

rdd.cache() // to make sure calculation of rdd is not repeated twice
val rdd1: RDD[String] = rdd.map(_._1)
val rdd2: RDD[String] = rdd.map(_._2)

A third alternative is to first map into these two RDDs and then collect each one of them, but it's not much different from the first option and suffers from the same risks and limitations.

第三种选择是首先映射到这两个 RDD 中,然后收集它们中的每一个,但这与第一种选择没有太大区别,并且具有相同的风险和限制。

回答by evan.oman

As an alternative to Tzach Zohar's answer, you can use unzipon the lists:

作为 Tzach Zohar 答案的替代方案,您可以unzip在列表中使用:

scala> val myRDD = sc.parallelize(Seq(("a", "b"), ("c", "d")))
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val (l1, l2) = myRDD.collect.toList.unzip
l1: List[String] = List(a, c)
l2: List[String] = List(b, d)

Or keysand valueson the RDDs:

keysvaluesRDDS:

scala> val (rdd1, rdd2) = (myRDD.keys, myRDD.values)
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at keys at <console>:33
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at values at <console>:33

scala> rdd1.foreach{println}
a
c

scala> rdd2.foreach{println}
d
b