scala 修改 Spark RDD foreach 中的集合

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23394286/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:14:14  来源:igfitidea点击:

Modify collection inside a Spark RDD foreach

scalaapache-sparkrdd

提问by palako

I'm trying to add elements to a map while iterating the elements of an RDD. I'm not getting any errors, but the modifications are not happening.

我正在尝试在迭代 RDD 的元素时向地图添加元素。我没有收到任何错误,但没有发生修改。

It all works fine adding directly or iterating other collections:

直接添加或迭代其他集合都可以正常工作:

scala> val myMap = new collection.mutable.HashMap[String,String]
myMap: scala.collection.mutable.HashMap[String,String] = Map()

scala> myMap("test1")="test1"

scala> myMap
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1)

scala> List("test2", "test3").foreach(w => myMap(w) = w)

scala> myMap
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

But when I try to do the same from an RDD:

但是当我尝试从 RDD 做同样的事情时:

scala> val fromFile = sc.textFile("tests.txt")
...
scala> fromFile.take(3)
...
res48: Array[String] = Array(test4, test5, test6)

scala> fromFile.foreach(w => myMap(w) = w)
scala> myMap
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

I've tried printing the contents of the map as it was before the foreach to make sure the variable is the same, and it prints correctly:

我已经尝试像在 foreach 之前一样打印地图的内容,以确保变量相同,并且打印正确:

fromFile.foreach(w => println(myMap("test1")))
...
test1
test1
test1
...

I've also printed the modified element of the map inside the foreach code and it prints as modified, but when the operation is completed, the map seems unmodified.

我还在 foreach 代码中打印了地图的修改元素,并打印为已修改,但是当操作完成时,地图似乎未修改。

scala> fromFile.foreach({w => myMap(w) = w; println(myMap(w))})
...
test4
test5
test6
...
scala> myMap
res55: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

Converting the RDD to an array (collect) also works fine:

将 RDD 转换为数组(收集)也可以正常工作:

fromFile.collect.foreach(w => myMap(w) = w)
scala> myMap
res89: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test5 -> test5, test1 -> test1, test4 -> test4, test6 -> test6, test3 -> test3)

Is this a context problem? Am I accessing a copy of the data that is being modified somewhere else?

这是上下文问题吗?我是否正在访问正在其他地方修改的数据副本?

回答by Daniel Darabos

It becomes clearer when running on a Spark cluster (not a single machine). The RDD is now spread over several machines. When you call foreach, you tell each machine what to do with the piece of the RDD that it has. If you refer to any local variables (like myMap), they get serialized and sent to the machines, so they can use it. But nothing comes back. So your original copy of myMapis unaffected.

在 Spark 集群(不是单台机器)上运行时会更清晰。RDD 现在分布在多台机器上。当你调用 时foreach,你告诉每台机器如何处理它拥有的 RDD 部分。如果您引用任何局部变量(如myMap),它们将被序列化并发送到机器,以便它们可以使用它。但什么都没有回来。因此,您的原始副本myMap不受影响。

I think this answers your question, but obviously you are trying to accomplish something and you will not be able to get there this way. Feel free to explain here or in a separate question what you are trying to do, and I will try to help.

我认为这回答了您的问题,但显然您正在尝试完成某些事情,而您将无法通过这种方式到达那里。请随意在此处或在单独的问题中解释您要做什么,我会尽力提供帮助。