Spark Scala:任务不可序列化错误

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/43592742/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:12:19  来源:igfitidea点击:

Spark Scala: Task Not serializable error

scalaapache-sparkpyspark

提问by SarahB

I am using IntelliJ Community Edition with Scala Plugin and spark libraries. I am still learning Spark and am using Scala Worksheet.

我正在使用带有 Scala 插件和火花库的 IntelliJ 社区版。我仍在学习 Spark 并且正在使用 Scala Worksheet。

I have written the below code which removes punctuation marks in a String:

我编写了以下代码来删除字符串中的标点符号:

def removePunctuation(text: String): String = {
  val punctPattern = "[^a-zA-Z0-9\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase
}

Then I read a text file and try to remove punctuation:

然后我阅读了一个文本文件并尝试删除标点符号:

val myfile = sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)

This gives error as below, any help would be appreciated:

这给出了如下错误,任何帮助将不胜感激:

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(/home/ubuntu/src/main/scala/Test.sc:294) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(/home/ubuntu/src/main/scala/Test.sc:284) at org.apache.spark.util.ClosureCleaner$.clean(/home/ubuntu/src/main/scala/Test.sc:104) at org.apache.spark.SparkContext.clean(/home/ubuntu/src/main/scala/Test.sc:2090) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(/home/ubuntu/src/main/scala/Test.sc:366) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(/home/ubuntu/src/main/scala/Test.sc:365) at org.apache.spark.rdd.RDDOperationScope$.withScope(/home/ubuntu/src/main/scala/Test.sc:147) at #worksheet#.#worksheet#(/home/ubuntu/src/main/scala/Test.sc:108) Caused by: java.io.NotSerializableException: A$A21$A$A21 Serialization stack: - object not serializable (class: A$A21$A$A21, value: A$A21$A$A21@62db3891) - field (class: A$A21$A$A21$$anonfun$words$1, name: $outer, type: class A$A21$A$A21) - object (class A$A21$A$A21$$anonfun$words$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala:369) at A$A21$A$A21.words$lzycompute(Test.sc:27) at A$A21$A$A21.words(Test.sc:27) at A$A21$A$A21.get$$instance$$words(Test.sc:27) at A$A21$.main(Test.sc:73) at A$A21.main(Test.sc) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.jetbrains.plugins.scala.worksheet.MyWorksheetRunner.main(MyWorksheetRunner.java:22)

org.apache.spark.SparkException:在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(/home/ubuntu/src/main/scala/Test.sc:294) 处的任务不可序列化在 org.apache.spark.util .ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(/home/ubuntu/src/main/scala/Test.sc:284) 在 org.apache.spark.util.ClosureCleaner$.clean(/home /ubuntu/src/main/scala/Test.sc:104) 在 org.apache.spark.SparkContext.clean(/home/ubuntu/src/main/scala/Test.sc:2090) 在 org.apache.spark。 rdd.RDD$$anonfun$map$1.apply(/home/ubuntu/src/main/scala/Test.sc:366) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(/home /ubuntu/src/main/scala/Test.sc:365) 在 org.apache.spark.rdd.RDDOOperationScope$.withScope(/home/ubuntu/src/main/scala/Test.sc:147) 在#worksheet# .#worksheet#(/home/ubuntu/src/main/scala/Test.sc:108) 引起:java.io.NotSerializableException:A$A21$A$A21 序列化堆栈:-对象不可序列化(类:A$A21$A$A21,值:A$A21$A$A21@62db3891)-字段(类:A$A21$ A$A21$$anonfun$words$1,名称:$outer,类型:类 A$A21$A$A21) - org.apache 中的对象(类 A$A21$A$A21$$anonfun$words$1,)。 spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala) :100) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner. scala:288) 在 org.apache.spark.util.ClosureCleaner$。在 org.apache.spark.SparkContext.clean(SparkContext.scala:2094) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) 的清洁(ClosureCleaner.scala:108)在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache。 spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala) :369) 在 A$A21$A$A21.words$lzycompute(Test.sc:27) 在 A$A21$A$A21.words(Test.sc:27) 在 A$A21$A$A21.get$ $instance$$words(Test.sc:27) at A$A21$.main(Test.sc:73) at A$A21.main(Test.sc) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl。invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.jetbrains.plugins.scala.worksheet .MyWorksheetRunner.main(MyWorksheetRunner.java:22)

采纳答案by Vidya

As @TGaweda suggests, Spark's SerializationDebuggeris very helpful for identifying "the serialization path leading from the given object to the problematic object." All the dollar signs before the "Serialization stack" in the stack trace indicate that the container object for your method is the problem.

正如@TGaweda 所建议的那样,SparkSerializationDebugger对识别“从给定对象到有问题的对象的序列化路径”非常有帮助。堆栈跟踪中“序列化堆栈”之前的所有美元符号表示您的方法的容器对象是问题所在。

While it is easiest to just slap Serializableon your container class, I prefer to take advantage of the fact Scala is a functional language and use your function as a first class citizen:

虽然最简单的方法是Serializable使用容器类,但我更喜欢利用 Scala 是一种函数式语言的事实,并将您的函数用作一等公民:

sc.textFile("/home/ubuntu/data.txt",4).map { text =>
  val punctPattern = "[^a-zA-Z0-9\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase
}

Or if you really want to keep things separate:

或者,如果您真的想将事情分开:

val removePunctuation: String => String = (text: String) => {
  val punctPattern = "[^a-zA-Z0-9\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase
}
sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)

These options work of course since Regexis serializableas you should confirm.

这些选项当然有效,因为您应该确认Regex可序列化

On a secondary but very important note, constructing a Regexis expensive, so factor it out of your transformations for the sake of performance--possibly with a broadcast.

在次要但非常重要的注意事项中,构建 aRegex是昂贵的,因此为了性能起见,将其从您的转换中排除 - 可能使用广播

回答by Zoltán

As T. Gaweda already pointed out, you're most likely defining your function in a class that's not serializable. Because it is a pure function, i.e. it doesn't depend on any context of the enclosing class, I suggest you put it into a companion object which should extend Serializable. This would be Scala's equivalent of a Java static method:

正如 T. Gaweda 已经指出的那样,您很可能在一个不可序列化的类中定义您的函数。因为它是一个纯函数,即它不依赖于封闭类的任何上下文,我建议您将它放入一个应该扩展的伴生对象中Serializable。这将是 Scala 的 Java 静态方法的等价物:

object Helper extends Serializable {
  def removePunctuation(text: String): String = {
    val punctPattern = "[^a-zA-Z0-9\s]".r
    punctPattern.replaceAllIn(text, "").toLowerCase
  }
}

回答by T. Gaw?da

Read the stacktrace, there is:

阅读堆栈跟踪,有:

$outer, type: class A$A21$A$A21

$outer,类型:A$A21$A$A21 类

It is a very good hint. Your lambda is serializable, but your class is not serializable.

这是一个很好的提示。您的 lambda 是可序列化的,但您的类不可序列化。

When you make lambda expression, then this expression has reference to outer class. Outer class in your case is not serializable, i.e. is not implementing Serializable or one of fields is not an instance of Serializable

当您创建 lambda 表达式时,此表达式将引用外部类。您的情况下的外部类不可序列化,即未实现 Serializable 或其中一个字段不是 Serializable 的实例