java.lang.ClassCastException 在远程服务器上的 spark 作业中使用 lambda 表达式

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/28186607/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 13:09:57  来源:igfitidea点击:

java.lang.ClassCastException using lambda expressions in spark job on remote server

javalambdajava-8spark-java

提问by Mehraban

I'm trying to build a web api for my apache spark jobs using sparkjava.com framework. My code is:

我正在尝试使用 sparkjava.com 框架为我的 apache spark 作业构建一个 web api。我的代码是:

@Override
public void init() {
    get("/hello",
            (req, res) -> {
                String sourcePath = "hdfs://spark:54310/input/*";

                SparkConf conf = new SparkConf().setAppName("LineCount");
                conf.setJars(new String[] { "/home/sam/resin-4.0.42/webapps/test.war" });
                File configFile = new File("config.properties");

                String sparkURI = "spark://hamrah:7077";

                conf.setMaster(sparkURI);
                conf.set("spark.driver.allowMultipleContexts", "true");
                JavaSparkContext sc = new JavaSparkContext(conf);

                @SuppressWarnings("resource")
                JavaRDD<String> log = sc.textFile(sourcePath);

                JavaRDD<String> lines = log.filter(x -> {
                    return true;
                });

                return lines.count();
            });
}

If I remove the lambda expression or put it inside a simple jar rather than a web service (somehow a servlet) it will run without any error. But using a lambda expression inside a servlet will result this exception:

如果我删除 lambda 表达式或将它放在一个简单的 jar 中而不是一个 Web 服务(某种程度上是一个 servlet)中,它将运行而不会出现任何错误。但是在 servlet 中使用 lambda 表达式会导致这个异常:

15/01/28 10:36:33 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hamrah): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter.f of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1999)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

P.S: I tried combination of jersey and javaspark with jetty, tomcat and resin and all of them led me to the same result.

PS:我尝试将 jersey 和 javaspark 与 jetty、tomcat 和resin 结合使用,所有这些都使我得到了相同的结果。

回答by Holger

What you have here, is a follow-up error which masks the original error.

您在这里看到的是一个后续错误,它掩盖了原始错误。

When lambda instances are serialized, they use writeReplaceto dissolve their JRE specific implementation from the persistent form which is a SerializedLambdainstance. When the SerializedLambdainstance has been restored, its readResolvemethod will be invoked to reconstitute the appropriate lambda instance. As the documentation says, it will do so by invoking a special method of the class which defined the original lambda (see also this answer). The important point is that the original class is needed and that's what's missing in your case.

当 lambda 实例被序列化时,它们用于writeReplace从作为SerializedLambda实例的持久形式中分解它们的 JRE 特定实现。当SerializedLambda实例被恢复时,它的readResolve方法将被调用来重新构建合适的 lambda 实例。正如文档所说,它将通过调用定义原始 lambda 的类的特殊方法来实现(另请参阅此答案)。重要的一点是需要原始类,而这正是您的情况所缺少的。

But there's a …special… behavior of the ObjectInputStream. When it encounters an exception, it doesn't bail out immediately. It will record the exception and continue the process, marking all object being currently read, thus depending on the erroneous object as being erroneous as well. Only at the end of the process it will throw the original exception it encountered. What makes it so strange is that it will also continue trying to set the fields of these object. But when you look at the method ObjectInputStream.readOrdinaryObjectline 1806:

但是ObjectInputStream. 当它遇到异常时,它不会立即退出。它将记录异常并继续该过程,标记当前正在读取的所有对象,从而根据错误对象也为错误对象。只有在进程结束时,它才会抛出它遇到的原始异常。让它如此奇怪的是,它还会继续尝试设置这些对象的字段。但是当您查看方法ObjectInputStream.readOrdinaryObject行 1806 时:

…
    if (obj != null &&
        handles.lookupException(passHandle) == null &&
        desc.hasReadResolveMethod())
    {
        Object rep = desc.invokeReadResolve(obj);
        if (unshared && rep.getClass().isArray()) {
            rep = cloneArray(rep);
        }
        if (rep != obj) {
            handles.setObject(passHandle, obj = rep);
        }
    }

    return obj;
}

you see that it doesn't call the readResolvemethod when lookupExceptionreports a non-nullexception. But when the substitution did not happen, it's not a good idea to continue trying to set the field values of the referrer but that's exactly what's happens here, hence producing a ClassCastException.

您会看到它readResolvelookupException报告非null异常时不会调用该方法。但是当替换没有发生时,继续尝试设置引用者的字段值并不是一个好主意,但这正是这里发生的情况,因此产生了ClassCastException.

You can easily reproduce the problem:

您可以轻松重现该问题:

public class Holder implements Serializable {
    Runnable r;
}
public class Defining {
    public static Holder get() {
        final Holder holder = new Holder();
        holder.r=(Runnable&Serializable)()->{};
        return holder;
    }
}
public class Writing {
    static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser");
    public static void main(String... arg) throws IOException {
        try(FileOutputStream os=new FileOutputStream(f);
            ObjectOutputStream   oos=new ObjectOutputStream(os)) {
            oos.writeObject(Defining.get());
        }
        System.out.println("written to "+f);
    }
}
public class Reading {
    static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser");
    public static void main(String... arg) throws IOException, ClassNotFoundException {
        try(FileInputStream is=new FileInputStream(f);
            ObjectInputStream ois=new ObjectInputStream(is)) {
            Holder h=(Holder)ois.readObject();
            System.out.println(h.r);
            h.r.run();
        }
        System.out.println("read from "+f);
    }
}

Compile these four classes and run Writing. Then delete the class file Defining.classand run Reading. Then you will get a

编译这四个类并运行Writing. 然后删除类文件Defining.class并运行Reading. 然后你会得到一个

Exception in thread "main" java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field test.Holder.r of type java.lang.Runnable in instance of test.Holder
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)

(Tested with 1.8.0_20)

(用 1.8.0_20 测试)



The bottom line is that you may forget about this Serialization issue once it is understood what's happening, all you have to do for solving your problem is to make sure that the class which defined the lambda expression is also available in the runtime where the lambda is deserialized.

最重要的是,一旦了解正在发生的事情,您可能会忘记这个序列化问题,解决问题所需要做的就是确保定义 lambda 表达式的类在 lambda 所在的运行时中也可用反序列化。

Example for Spark Job to run directly from IDE (spark-submit distributes jar by default):

Spark Job 直接从 IDE 运行的示例(spark-submit 默认分发 jar):

SparkConf sconf = new SparkConf()
  .set("spark.eventLog.dir", "hdfs://nn:8020/user/spark/applicationHistory")
  .set("spark.eventLog.enabled", "true")
  .setJars(new String[]{"/path/to/jar/with/your/class.jar"})
  .setMaster("spark://spark.standalone.uri:7077");

回答by Sergey Fedorov

I suppose your problem is failed auto-boxing. In the code

我想你的问题是自动装箱失败。在代码中

x -> {
      return true;
}

you pass (String->boolean) lambda (it is Predicate<String>) while filter methodtakes (String->Boolean) lambda (it is Function<String,Boolean>). So I offer you to change code to

您传递 ( String->boolean) lambda (它是Predicate<String>),而filter 方法采用 ( String->Boolean) lambda (它是Function<String,Boolean>)。所以我建议您将代码更改为

x -> {
      return Boolean.TRUE;
}

Include details into your question please. Output from uname -aand java -versionis appreciated. Provide sscceif possible.

请在您的问题中包含详细信息。输出uname -ajava -version赞赏。如果可能,请提供sscce

回答by Adrian Smith

I had the same error and I replaced the lambda with an inner class, then it worked. I don't really understand why, and reproducing this error was extremely difficult (we had one server which exhibited the behavior, and nowhere else).

我遇到了同样的错误,我用内部类替换了 lambda,然后它就起作用了。我真的不明白为什么,重现这个错误非常困难(我们有一台服务器表现出这种行为,而没有其他地方)。

Causes serialization problems(uses lambdas, causes SerializedLambdaerror)

导致序列化问题(使用 lambdas,导致SerializedLambda错误)

this.variable = () -> { ..... }

Yields java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field MyObject.val$variable

产量 java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field MyObject.val$variable

Works

作品

this.variable = new MyInterface() {
    public void myMethod() {
       .....
    }
};

回答by Nicolas Zozol

You can maybe more simply remplace your Java8 lambda with a spark.scala.Function

您也许可以更简单地将 Java8 lambda 替换为 spark.scala.Function

replace

代替

output = rdds.map(x->this.function(x)).collect()

with:

和:

output = rdds.map(new Function<Double,Double>(){

   public Double call(Double x){
       return MyClass.this.function(x);
   }

}).collect();