scala spark上的序列化异常

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/27579474/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:47:55  来源:igfitidea点击:

Serialization Exception on spark

scalaapache-sparkserializable

提问by superhan

I meet a very strange problem on Spark about serialization. The code is as below:

我在 Spark 上遇到了一个关于序列化的非常奇怪的问题。代码如下:

class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable
{
    def infer(document: RDD[Document]): RDD[DocumentParameter] = {
      val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
      docs
    }
}

where Document is defined as:

其中文档定义为:

class Document(val tokens: SparseVector[Int]) extends Serializable

and DocumentParameter is:

和 DocumentParameter 是:

class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable

object DocumentParameter extends Serializable
{
  def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document, 
    Array.ofDim[Float](numOfTopics))
}

SparseVectoris a serializable class in breeze.linalg.SparseVector.

SparseVector 是breeze.linalg.SparseVector.

This is a simple map procedure, and all the classes are serializable, but I get this exception:

这是一个简单的映射过程,所有的类都是可序列化的,但我得到了这个异常:

org.apache.spark.SparkException: Task not serializable

But when I remove the numOfTopicsparameter, that is:

但是当我删除numOfTopics参数时,即:

object DocumentParameter extends Serializable
{
  def apply(document: Document) = new DocumentParameter(document, 
    Array.ofDim[Float](10))
}

and call it like this:

并这样称呼它:

val docs = documents.map(DocumentParameter.apply)

and it seems OK.

看起来还可以。

Is type Int not serializable? But I do see that some code is written like that.

类型 Int 不可序列化吗?但我确实看到有些代码是这样写的。

I am not sure how to fix this bug.

我不知道如何修复这个错误。

#UPDATED#:

#更新#

Thank you @samthebest. I will add more details about it.

谢谢@samthebest。我会添加更多关于它的细节。

stack trace:
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.rdd.RDD.map(RDD.scala:270)
    at com.topicmodel.PLSA.infer(PLSA.scala:13)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
    at $iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC.<init>(<console>:41)
    at $iwC.<init>(<console>:43)
    at <init>(<console>:45)
    at .<init>(<console>:49)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:615)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
    at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
    at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:616)
    at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:624)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
    at org.apache.spark.repl.SparkILoop$$anonfun$process.apply$mcZ$sp(SparkILoop.scala:954)
    at org.apache.spark.repl.SparkILoop$$anonfun$process.apply(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop$$anonfun$process.apply(SparkILoop.scala:902)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 46 more

As the stack trace gives the general information of exception, I removed it.

由于堆栈跟踪提供了异常的一般信息,我将其删除。

I run the code in the spark-shell.

我在 spark-shell 中运行代码。

// suppose I have get RDD[Document] for docs
val numOfTopics = 100
val plsa = new PLSA(sc, numOfTopics)
val docPara = plsa.infer(docs)

Could you give me some tutorials or tips on serializable?

你能给我一些关于可序列化的教程或技巧吗?

回答by lmm

Anonymous functions serialize their containing class. When you map {doc => DocumentParameter(doc, numOfTopics)}, the only way it can give that function access to numOfTopicsis to serialize the PLSAclass. And that class can't actually be serialized, because (as you can see from the stacktrace) it contains the SparkContextwhich isn't serializable (Bad Things would happen if individual cluster nodes had access to the context and could e.g. create new jobs from within a mapper).

匿名函数序列化它们的包含类。当您 时map {doc => DocumentParameter(doc, numOfTopics)},它可以授予该函数访问权限的唯一方法numOfTopics是序列化PLSA该类。并且该类实际上不能被序列化,因为(正如您从堆栈跟踪中看到的)它包含SparkContext不可序列化的(如果单个集群节点可以访问上下文并且可以例如从内部创建新作业,则会发生坏事映射器)。

In general, try to avoid storing the SparkContextin your classes (edit: or at least, make sure it's very clear what kind of classes contain the SparkContextand what kind don't); it's better to pass it as a (possibly implicit) parameter to individual methods that need it. Alternatively, move the function {doc => DocumentParameter(doc, numOfTopics)}into a different class from PLSA, one that really can be serialized.

一般来说,尽量避免将 the 存储SparkContext在您的类中(编辑:或至少,确保非常清楚哪些类包含 theSparkContext哪些类不包含);最好将它作为(可能implicit)参数传递给需要它的各个方法。或者,将函数{doc => DocumentParameter(doc, numOfTopics)}移到与 不同的类中PLSA,一个真正可以序列化的类。

(As multiple people have suggested, it's possible to keep the SparkContextin the class but marked as @transientso that it won't be serialized. I don't recommend this approach; it means the class will "magically" change state when serialized (losing the SparkContext), and so you might end up with NPEs when you try to access the SparkContextfrom inside a serialized job. It's better to maintain a clear distinction between classes that are only used in the "control" code (and might use the SparkContext) and classes that are serialized to run on the cluster (which must not have the SparkContext)).

(正如许多人所建议的,可以将 保留SparkContext在类中,但标记为@transient不会被序列化。我不推荐这种方法;这意味着该类在序列化时会“神奇地”改变状态(丢失SparkContext),因此当您尝试SparkContext从序列化作业内部访问 时,您可能最终会遇到 NPE 。最好在仅在“控制”代码中使用(并且可能使用SparkContext)的类和那些被序列化以在集群上运行(不能有SparkContext))。

回答by samthebest

This is indeed a weird one, but I think I can guess the problem. But first, you have not provided the bare minimum to solve the problem (I can guess, because I've seen 100s of these before). Here are some problems with your question:

这确实是一个奇怪的问题,但我想我能猜到问题所在。但是首先,您没有提供解决问题的最低要求(我可以猜到,因为我以前见过 100 个这样的问题)。以下是您的问题的一些问题:

def infer(document: RDD[Document], numOfTopics: Int): RDD[DocumentParameter] = {
  val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
}

This method doesn't return RDD[DocumentParameter]it returns Unit. You must have copied and pasted code incorrectly.

此方法不返回RDD[DocumentParameter]它返回Unit。您一定是错误地复制和粘贴了代码。

Secondly you haven't provided the entire stack trace? Why? There is no reason NOT to provide the full stack trace, and the full stack trace with message is necessary to understand the error - one needs the whole error to understand what the error is. Usually a not serializable exception tells you whatis not serializable.

其次,您还没有提供整个堆栈跟踪?为什么?没有理由不提供完整的堆栈跟踪,并且带有消息的完整堆栈跟踪对于理解错误是必要的——需要整个错误才能理解错误是什么。通常不可序列化的异常会告诉你什么是不可序列化的。

Thirdly you haven't told us where method inferis, are you doing this in a shell? What is the containing object/class/trait etc of infer?

第三,您还没有告诉我们方法在哪里infer,您是在 shell 中执行此操作吗?包含的对象/类/特征等是infer什么?

Anyway, I'm going guess that by passing in the Intyour causing a chain of things to get serialized that you don't expect, I can't give you any more information than that until you provide the bare minimum code so we can fully understand your problem.

无论如何,我猜想通过传入Int您导致一系列您不期望的事情被序列化,在您提供最低限度的代码之前,我无法为您提供更多信息,以便我们可以完全了解您的问题。