Java 运行apache spark作业时任务不可序列化异常
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/25914057/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Task not serializable exception while running apache spark job
提问by Siva
The following java program was written to experiment with apache spark.
编写了以下 java 程序来试验 apache spark。
The program tries to read a list of positive and negative words from a respective file, compare it to the master file and filter the results accordingly.
该程序尝试从相应文件中读取肯定和否定词的列表,将其与主文件进行比较并相应地过滤结果。
import java.io.Serializable;
import java.io.FileNotFoundException;
import java.io.File;
import java.util.*;
import java.util.Iterator;
import java.util.List;
import java.util.List;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
public class SimpleApp implements Serializable{
public static void main(String[] args) {
String logFile = "/tmp/master.txt"; // Should be some file on your system
String positive = "/tmp/positive.txt"; // Should be some file on your system
String negative = "/tmp/negative.txt"; // Should be some file on your system
JavaSparkContext sc = new JavaSparkContext("local[4]", "Twitter Analyzer", "/home/welcome/Downloads/spark-1.1.0/", new String[]{"target/scala-2.10/Simple-assembly-0.1.0.jar"});
JavaRDD<String> positiveComments = sc.textFile(logFile).cache();
List<String> positiveList = GetSentiments(positive);
List<String> negativeList= GetSentiments(negative);
final Iterator<String> iterator = positiveList.iterator();
int i = 0;
while (iterator.hasNext())
{
JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
{
public Boolean call(String s)
{
return s.contains(iterator.next());
}
});
numAs.saveAsTextFile("/tmp/output/"+ i);
i++;
}
}
public static List<String> GetSentiments(String fileName) {
List<String> input = new ArrayList<String>();
try
{
Scanner sc = new Scanner(new File(fileName));
while (sc.hasNextLine()) {
input.add(sc.nextLine());
}
}
catch (FileNotFoundException e){
// do stuff here..
}
return input;
}
}
The Following error is thrown while executing spark job,
执行火花作业时抛出以下错误,
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)
at SimpleApp.main(SimpleApp.java:37)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: java.util.ArrayList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 12 more
Any pointers??
有什么指点吗??
采纳答案by NoDataFound
When you create an anonymous class, the compiler does some stuff:
当你创建一个匿名类时,编译器会做一些事情:
JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
{
public Boolean call(String s)
{
return s.contains(iterator.next());
}
});
It will be rewritten as:
将改写为:
JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
{
private Iterator<...> $iterator;
public Boolean call(String s)
{
return s.contains($iterator.next());
}
});
This is why you can have a NotSerializableException
because the Iterator is not serializable.
这就是为什么您可以拥有 a 的NotSerializableException
原因,因为 Iterator 不可序列化。
To avoid that, simply extract the result of next before:
为避免这种情况,只需提取 next before 的结果:
String value = iterator.next();
JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
{
public Boolean call(String s)
{
return s.contains(value);
}
});
回答by KrazyGautam
Some Java Facts
一些 Java 事实
- Any anonymous class defined inside a outer class has reference to the outer class.
- If the anonymous class needs to be serialized it will compel you to make the outer class serialized.
- Inside the lambda function if one uses a method of the enclosing class , the class needs to be serialized , if the lambda function is being serialized.
- 在外部类中定义的任何匿名类都引用了外部类。
- 如果匿名类需要序列化,它会迫使您将外部类序列化。
- 在 lambda 函数内部,如果使用封闭类的方法,则该类需要被序列化,如果 lambda 函数正在被序列化。
Some Facts about Spark.
关于 Spark 的一些事实。
- On Same Executor multiple tasks can run at the same time in the same JVM as Tasks are spawned as threads in spark.
- Any lambda, Anonymous Class used with the spark Transformation function (map, mapPartitions, keyBy , redudeByKey …) will be instantiated on driver, serialized and sent to the executor.
- To serialize an object means to convert its state to a byte stream so that the byte stream can be reverted back into a copy of the object.
- A Java object is serializable if its class or any of its super class implements either the java.io.Serializable interface or its subinterface, java.io.Externalizable, and all of its non-transient, non-static fields are serializable.
- 在同一个执行器上,多个任务可以在同一个 JVM 中同时运行,因为任务在 spark 中作为线程产生。
- 任何与 spark 转换函数(map、mapPartitions、keyBy、redudeByKey ...)一起使用的 lambda、匿名类都将在驱动程序上实例化、序列化并发送到执行程序。
- 序列化对象意味着将其状态转换为字节流,以便可以将字节流恢复为对象的副本。
- 如果 Java 对象的类或其任何超类实现 java.io.Serializable 接口或其子接口 java.io.Externalizable 并且其所有非瞬态、非静态字段都是可序列化的,则该对象是可序列化的。
Rule of thumb to avoid serialization problem:
避免序列化问题的经验法则:
- Avoid using anonymous class , instead use static classes as anonymous class will force you to have the outer class serialized.
- Avoid using static variables as a work around for serialization issue , as Multiple Task can run inside the same JVM and the static instance might not be thread safe.
- Use Transient variables to avoid serialization issue , you will have to initialize them inside the function call and not Constructor. As on driver the constructor will be called , on Executor it will de-serialize and for the object . only way to initialize is inside the function call .
- Use Static class in place of anonymous class.
- Religiously follow ” attaching implements Serializable ” only for the classes which only needs to be serialized
- Inside a “lambda function” never refer to outclass method directly , as this will lead to serialization of outer class.
- Make methods static if it needs to be used within Lambda function directly , else use Class::func() notion but not func() directly
- Java Map<> doesn't implement Serializable but HashMap does .
- Be wise when deciding over using Braodcast vs Raw DataStructures. If you see a real benefit then only use Broadcast.
- 避免使用匿名类,而是使用静态类,因为匿名类会迫使您序列化外部类。
- 避免使用静态变量作为解决序列化问题的方法,因为多任务可以在同一个 JVM 中运行,并且静态实例可能不是线程安全的。
- 使用瞬态变量来避免序列化问题,您必须在函数调用而不是构造函数中初始化它们。在驱动程序上,构造函数将被调用,在 Executor 上它将反序列化和对象。初始化的唯一方法是在函数调用内部。
- 使用静态类代替匿名类。
- 宗教上只对只需要序列化的类遵循“附加工具可序列化”
- 在“lambda 函数”内部永远不会直接引用 outclass 方法,因为这会导致外部类的序列化。
- 如果需要直接在 Lambda 函数中使用,则将方法设为静态,否则使用 Class::func() 概念而不是直接使用 func()
- Java Map<> 不实现 Serializable 但 HashMap 实现。
- 在决定使用 Braodcast 还是 Raw DataStructures 时要明智。如果您看到真正的好处,则仅使用广播。
For a in depth understanding follow http://bytepadding.com/big-data/spark/understanding-spark-serialization/
如需深入了解,请访问 http://bytepadding.com/big-data/spark/understanding-spark-serialization/