scala org.apache.spark.SparkException:任务不可序列化

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29295838/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:00:36  来源:igfitidea点击:

org.apache.spark.SparkException: Task not serializable

scalaapache-sparkapache-kafka

提问by xiaolong li

This is a working code example:

这是一个工作代码示例:

JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap);
messages.print();
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
    @Override
    public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
    }
});

I get the below error:

我收到以下错误:

ERROR:
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140)
    at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46)

回答by InPursuit

Since you're defining your map function using an anonymous inner class, the containing class must also be Serializable. Define your map function as a separate class or make it a static inner class. From the Java documentation (http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):

由于您使用匿名内部类定义映射函数,因此包含类也必须是可序列化的。将您的地图函数定义为单独的类或使其成为静态内部类。从 Java 文档(http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):

Note - Serialization of inner classes (i.e., nested classes that are not static member classes), including local and anonymous classes, is strongly discouraged for several reasons. Because inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

注 - 出于多种原因,强烈建议不要对内部类(即,不是静态成员类的嵌套类)(包括本地类和匿名类)进行序列化。因为在非静态上下文中声明的内部类包含对封闭类实例的隐式非瞬态引用,序列化这样的内部类实例也将导致其关联的外部类实例的序列化。

回答by udyan

just providing the code sample :

只提供代码示例:

JavaDStream<String> lines = messages.map(mapFunc);

declare the inner class as a static variable :

将内部类声明为静态变量:

static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() {
    @Override
    public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
    }
}