如何使用 Spark 的映射转换在 Scala 中返回多个键值对?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29471040/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:03:03  来源:igfitidea点击:

How do I return multiple key-value pairs in Scala using Spark's map transformation?

scalaapache-sparkscala-collections

提问by Jane Wayne

I'm new to Scala and Spark. I'm trying to return multiple key-value pairs during the map transformation. My input data is a simple CSV file.

我是 Scala 和 Spark 的新手。我试图在地图转换期间返回多个键值对。我的输入数据是一个简单的 CSV 文件。

1, 2, 3
4, 5, 6
7, 8, 9

My Scala script looks like the following.

我的 Scala 脚本如下所示。

class Key(_i:Integer, _j:Integer) {
 def i = _i
 def j = _j
}
class Val(_x:Double, _y:Double) {
 def x = _x
 def y = _y
}
val arr = "1,2,3".split(",")
for(i <- 0 until arr.length) {
 val x = arr(i).toDouble
 for(j <- 0 until arr.length) {
  val y = arr(j).toDouble
  val k = new Key(i, j)
  val v = new Val(x, y)
  //note that i want to return the tuples, (k, v)
 }
}

I want to be able to use the for loop and data structures above to return multiple tuples (k, v). Something similar to the code below.

我希望能够使用上面的 for 循环和数据结构来返回多个元组 (k, v)。类似于下面的代码。

val file = sc.textFile("/path/to/test.csv")
file.map(line => {
 val arr = line.split(",")
 for(i <- 0 until arr.length) {
  val x = arr(i).toDouble
  for(j <- (i+1) until arr.length) {
   val y = arr(j).toDouble
   val k = new Index(i,j)
   val v = new Val(x,y)
   (k,v)
  }
 }
}).collect //reduceByKey is not there, reduce is there, but not what i want

When I copy/paste the code above into the lambda expression (and run on the Scala REPL shell) I get the following error:

当我将上面的代码复制/粘贴到 lambda 表达式中(并在 Scala REPL shell 上运行)时,我收到以下错误:

error: illegal start of simple expression
val arr = line.split(",")
^

I realize also that I am still stuck in imperative/procedural style programming thinking, so please bear with me (and a newbie at Scala/Spark).

我也意识到我仍然停留在命令式/过程式编程思维中,所以请耐心等待我(以及 Scala/Spark 的新手)。

回答by Daniel Darabos

Use RDD.flatMapand yielda list from the forloop:

使用RDD.flatMapyield从列表for循环:

val file = sc.textFile("/path/to/test.csv")
file.flatMap { line =>
  val arr = line.split(",")
  for {
    i <- 0 until arr.length
    j <- (i + 1) until arr.length
  } yield {
    val x = arr(i).toDouble
    val y = arr(j).toDouble
    val k = new Index(i, j)
    val v = new Val(x, y)
    (k, v)
  }
}.collect

回答by Justin Pihony

You are forgetting the brackets after the arrow. You can only omit them if it is a simple expression (one expression).

你忘记了箭头后面的括号。如果是简单表达式(一个表达式),则只能省略它们。

file.map(line => {
    //multiple lines of code here
})

Full answer after edits:

编辑后的完整答案:

case class Index(i:Integer, j:Integer)
case class Val(x:Double, y:Double)

val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
data.flatMap(line=>{
val arr = line.split(",")
 val doubleSeq = for(i <- 0 until arr.length) yield {
  val x = arr(i).toDouble
  for(j <- (i+1) until arr.length) yield {
   val y = arr(j).toDouble
   val k = Index(i,j)
   val v = Val(x,y)
   (k,v)
  }
 }
 doubleSeq.flatten
})

There were a multitude of problems actually:

其实有很多问题:

  • Notice that I changed your classes to be case classes as they are serializable. Otherwise, you will need to implement Serializable
  • I changed mapto flatMap, as well as flattened your array as one flatMapwould still leave you with an inner array. Now, the combination of the two will yield you your RDD[(Index, Val)], which can now be implicitly used with reduceByKey
  • I turned your forloop into a forcomprehension by using yield. You were getting a final type of Unitbecause the return type of a forloop is Unit
  • 请注意,我将您的类更改为 case 类,因为它们是可序列化的。否则,您将需要实施Serializable
  • 我换mapflatMap,以及flatten编你的阵列作为一个flatMap仍然会留下一个内部数组。现在,两者的组合将为您提供您的RDD[(Index, Val)],现在可以隐式地与reduceByKey
  • 我通过使用将你的for循环变成了for理解yield。您得到的是最终类型,Unit因为for循环的返回类型是Unit