scala 任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/22592811/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
提问by Nimrod007
Getting strange behavior when calling function outside of a closure:
在闭包外调用函数时出现奇怪的行为:
- when function is in a object everything is working
- when function is in a class get :
- 当函数在一个对象中时,一切正常
- 当函数在一个类中时 get :
Task not serializable: java.io.NotSerializableException: testing
任务不可序列化:java.io.NotSerializableException:测试
The problem is I need my code in a class and not an object. Any idea why this is happening? Is a Scala object serialized (default?)?
问题是我需要在类中而不是对象中的代码。知道为什么会这样吗?Scala 对象是否序列化(默认?)?
This is a working code example:
这是一个工作代码示例:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
This is the non-working example :
这是非工作示例:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
回答by Grega Ke?pret
RDDs extend the Serialisable interface, so this is not what's causing your task to fail. Now this doesn't mean that you can serialise an RDDwith Spark and avoid NotSerializableException
RDD 扩展了 Serialisable 接口,因此这不是导致您的任务失败的原因。现在这并不意味着您可以RDD使用 Spark序列化并避免NotSerializableException
Spark is a distributed computing engine and its main abstraction is a resilient distributed dataset (RDD), which can be viewed as a distributed collection. Basically, RDD's elements are partitioned across the nodes of the cluster, but Spark abstracts this away from the user, letting the user interact with the RDD (collection) as if it were a local one.
Spark 是一个分布式计算引擎,它的主要抽象是一个弹性分布式数据集(RDD),它可以被看作是一个分布式集合。基本上,RDD 的元素跨集群的节点进行分区,但 Spark 将这一点从用户那里抽象出来,让用户与 RDD(集合)交互,就好像它是本地的一样。
Not to get into too many details, but when you run different transformations on a RDD (map, flatMap, filterand others), your transformation code (closure) is:
不要让太多细节,但是当你在一个RDD(运行不同的变换map,flatMap,filter等),您的转换代码包(closure)是:
- serialized on the driver node,
- shipped to the appropriate nodes in the cluster,
- deserialized,
- and finally executed on the nodes
- 在驱动程序节点上序列化,
- 运送到集群中的适当节点,
- 反序列化,
- 最后在节点上执行
You can of course run this locally (as in your example), but all those phases (apart from shipping over network) still occur. [This lets you catch any bugs even before deploying to production]
您当然可以在本地运行它(如您的示例中所示),但所有这些阶段(除了通过网络运输)仍然会发生。[这使您甚至可以在部署到生产之前捕获任何错误]
What happens in your second case is that you are calling a method, defined in class testingfrom inside the map function. Spark sees that and since methods cannot be serialized on their own, Spark tries to serialize the wholetestingclass, so that the code will still work when executed in another JVM. You have two possibilities:
在您的第二种情况下发生的事情是您正在调用一个方法,该方法testing在 map 函数内部的类中定义。Spark 看到了这一点,并且由于方法不能自己序列化,Spark 尝试序列化整个testing类,以便代码在另一个 JVM 中执行时仍然可以工作。你有两种可能:
Either you make class testing serializable, so the whole class can be serialized by Spark:
要么让类测试可序列化,这样整个类就可以被 Spark 序列化:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
or you make someFuncfunction instead of a method (functions are objects in Scala), so that Spark will be able to serialize it:
或者您创建someFunc函数而不是方法(函数是 Scala 中的对象),以便 Spark 能够序列化它:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
Similar, but not the same problem with class serialization can be of interest to you and you can read on it in this Spark Summit 2013 presentation.
您可能会对类序列化的类似但不相同的问题感兴趣,您可以在 Spark Summit 2013 演示文稿中阅读相关内容。
As a side note, you can rewrite rddList.map(someFunc(_))to rddList.map(someFunc), they are exactly the same. Usually, the second is preferred as it's less verbose and cleaner to read.
作为旁注,您可以重写rddList.map(someFunc(_))为rddList.map(someFunc),它们完全相同。通常,第二个是首选,因为它不那么冗长,阅读起来更清晰。
EDIT (2015-03-15): SPARK-5307introduced SerializationDebuggerand Spark 1.3.0 is the first version to use it. It adds serialization path to a NotSerializableException. When a NotSerializableException is encountered, the debugger visits the object graph to find the path towards the object that cannot be serialized, and constructs information to help user to find the object.
编辑 (2015-03-15):SPARK-5307引入了SerializationDebugger,Spark 1.3.0 是第一个使用它的版本。它将序列化路径添加到NotSerializableException。当遇到 NotSerializableException 时,调试器会访问对象图来寻找无法序列化的对象的路径,并构造信息来帮助用户找到该对象。
In OP's case, this is what gets printed to stdout:
在 OP 的情况下,这就是打印到标准输出的内容:
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun, name: $outer, type: class testing)
- object (class testing$$anonfun, <function1>)
回答by Ben Sidhom
Grega's answeris great in explaining why the original code does not work and two ways to fix the issue. However, this solution is not very flexible; consider the case where your closure includes a method call on a non-Serializableclass that you have no control over. You can neither add the Serializabletag to this class nor change the underlying implementation to change the method into a function.
Grega 的回答很好地解释了为什么原始代码不起作用以及解决问题的两种方法。但是,这种解决方案不是很灵活;考虑一下您的闭包包含对Serializable您无法控制的非类的方法调用的情况。您既不能将Serializable标记添加到此类,也不能更改底层实现以将方法更改为函数。
Nileshpresents a great workaround for this, but the solution can be made both more concise and general:
Nilesh为此提出了一个很好的解决方法,但该解决方案可以变得更加简洁和通用:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
This function-serializer can then be used to automatically wrap closures and method calls:
然后可以使用此函数序列化器自动包装闭包和方法调用:
rdd map genMapper(someFunc)
This technique also has the benefit of not requiring the additional Shark dependencies in order to access KryoSerializationWrapper, since Twitter's Chill is already pulled in by core Spark
这种技术还有一个好处是不需要额外的 Shark 依赖来访问KryoSerializationWrapper,因为 Twitter 的 Chill 已经被核心 Spark 拉进来了
回答by samthebest
Complete talk fully explaining the problem, which proposes a great paradigm shifting way to avoid these serialization problems: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md
完整的演讲充分解释了这个问题,它提出了一种很好的范式转换方式来避免这些序列化问题:https: //github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-无泄漏.md
The top voted answer is basically suggesting throwing away an entire language feature - that is no longer using methods and only using functions. Indeed in functional programming methods in classes should be avoided, but turning them into functions isn't solving the design issue here (see above link).
最高投票的答案基本上是建议丢弃整个语言功能——不再使用方法而只使用函数。确实应该避免使用类中的函数式编程方法,但是将它们转换为函数并不能解决这里的设计问题(参见上面的链接)。
As a quick fix in this particular situation you could just use the @transientannotation to tell it not to try to serialise the offending value (here, Spark.ctxis a custom class not Spark's one following OP's naming):
作为在这种特定情况下的快速修复,您可以使用@transient注释告诉它不要尝试序列化有问题的值(这里Spark.ctx是一个自定义类,而不是遵循 OP 命名的 Spark 类):
@transient
val rddList = Spark.ctx.parallelize(list)
You can also restructure code so that rddList lives somewhere else, but that is also nasty.
您还可以重组代码,以便 rddList 位于其他地方,但这也很糟糕。
The Future is Probably Spores
未来可能是孢子
In future Scala will include these things called "spores" that should allow us to fine grain control what does and does not exactly get pulled in by a closure. Furthermore this should turn all mistakes of accidentally pulling in non-serializable types (or any unwanted values) into compile errors rather than now which is horrible runtime exceptions / memory leaks.
将来 Scala 将包含这些称为“孢子”的东西,它们应该允许我们细粒度地控制哪些内容会被闭包拉入,哪些不会被闭包拉入。此外,这应该将所有意外引入不可序列化类型(或任何不需要的值)的错误转化为编译错误,而不是现在这是可怕的运行时异常/内存泄漏。
http://docs.scala-lang.org/sips/pending/spores.html
http://docs.scala-lang.org/sips/pending/spores.html
A tip on Kryo serialization
关于 Kryo 序列化的提示
When using kyro, make it so that registration is necessary, this will mean you get errors instead of memory leaks:
使用 kyro 时,请务必注册,这意味着您会收到错误而不是内存泄漏:
"Finally, I know that kryo has kryo.setRegistrationOptional(true) but I am having a very difficult time trying to figure out how to use it. When this option is turned on, kryo still seems to throw exceptions if I haven't registered classes."
“最后,我知道 kryo 有 kryo.setRegistrationOptional(true) 但我很难弄清楚如何使用它。当这个选项打开时,如果我没有注册,kryo 似乎仍然会抛出异常课。”
Strategy for registering classes with kryo
Of course this only gives you type-level control not value-level control.
当然,这只给你类型级别的控制,而不是值级别的控制。
... more ideas to come.
...更多的想法来。
回答by Nilesh
I solved this problem using a different approach. You simply need to serialize the objects before passing through the closure, and de-serialize afterwards. This approach just works, even if your classes aren't Serializable, because it uses Kryo behind the scenes. All you need is some curry. ;)
我使用不同的方法解决了这个问题。您只需要在通过闭包之前序列化对象,然后反序列化。即使您的类不是可序列化的,这种方法也很有效,因为它在幕后使用了 Kryo。你只需要一些咖喱。;)
Here's an example of how I did it:
这是我如何做到的一个例子:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()
object Blah(abc: ABC) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
Feel free to make Blah as complicated as you want, class, companion object, nested classes, references to multiple 3rd party libs.
随意使 Blah 变得复杂,如类、伴生对象、嵌套类、对多个 3rd 方库的引用。
KryoSerializationWrapper refers to: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
KryoSerializationWrapper 指的是:https: //github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
回答by Tarang Bhalodia
I faced similar issue, and what I understand from Grega's answeris
我面临着类似的问题,我从了解Grega的回答是
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
your doITmethod is trying to serialize someFunc(_)method, but as method are not serializable, it tries to serialize class testingwhich is again not serializable.
您的doIT方法正在尝试序列化someFunc(_)方法,但由于方法不可序列化,它尝试序列化类测试,这又是不可序列化的。
So make your code work, you should define someFuncinside doITmethod. For example:
所以让你的代码工作,你应该在doIT方法中定义someFunc。例如:
def doIT = {
def someFunc(a:Int) = a+1
//function definition
}
val after = rddList.map(someFunc(_))
after.collect().map(println(_))
}
And if there are multiple functions coming into picture, then all those functions should be available to the parent context.
如果有多个函数出现,那么所有这些函数都应该对父上下文可用。
回答by Trebor Rude
I'm not entirely certain that this applies to Scala but, in Java, I solved the NotSerializableExceptionby refactoring my code so that the closure did not access a non-serializable finalfield.
我不完全确定这是否适用于 Scala,但在 Java 中,我NotSerializableException通过重构我的代码解决了这个问题,这样闭包就不会访问不可序列化的final字段。
回答by Gabe Church
FYI in Spark 2.4 a lot of you will probably encounter this issue. Kryo serialization has gotten better but in many cases you cannot use spark.kryo.unsafe=true or the naive kryo serializer.
在 Spark 2.4 中仅供参考,很多人可能会遇到这个问题。Kryo 序列化已经变得更好,但在许多情况下,您不能使用 spark.kryo.unsafe=true 或朴素的 kryo 序列化器。
For a quick fix try changing the following in your Spark configuration
要快速修复,请尝试在 Spark 配置中更改以下内容
spark.kryo.unsafe="false"
OR
或者
spark.serializer="org.apache.spark.serializer.JavaSerializer"
I modify custom RDD transformations that I encounter or personally write by using explicit broadcast variables and utilizing the new inbuilt twitter-chill api, converting them from rdd.map(row =>to rdd.mapPartitions(partition => {functions.
我通过使用显式广播变量并利用新的内置 twitter-chill api 修改我遇到或亲自编写的自定义 RDD 转换,将它们从函数转换rdd.map(row =>为rdd.mapPartitions(partition => {函数。
Example
例子
Old (not-great) Way
旧(不太好)的方式
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
val value = sampleMap.get(row._1)
value
})
Alternative (better) Way
替代(更好)方式
import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))
rdd.mapPartitions(partition => {
val deSerSampleMap = brdSerSampleMap.value.get
partition.map(row => {
val value = sampleMap.get(row._1)
value
}).toIterator
})
This new way will only call the broadcast variable once per partition which is better. You will still need to use Java Serialization if you do not register classes.
这种新方法每个分区只调用一次广播变量,这是更好的。如果您不注册类,您仍然需要使用 Java 序列化。

