Spark 和 Java:awaitResult 中抛出的异常

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/40439652/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 22:42:21  来源:igfitidea点击:

Spark and Java: Exception thrown in awaitResult

javascalaapache-sparkhdfsprotocol-buffers

提问by Michael Lihs

I am trying to connect a Spark cluster running within a virtual machine with IP 10.20.30.50and port 7077from within a Java application and run the word count example:

我正在尝试使用 Java 应用程序中的IP10.20.30.50和端口连接在虚拟机中运行的 Spark 集群,7077并运行字数统计示例:

SparkConf conf = new SparkConf().setMaster("spark://10.20.30.50:7077").setAppName("wordCount");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> textFile = sc.textFile("hdfs://localhost:8020/README.md");
String result = Long.toString(textFile.count());
JavaRDD<String> words = textFile.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator());
JavaPairRDD<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
counts.saveAsTextFile("hdfs://localhost:8020/tmp/output");
sc.stop();
return result;

The Java application shows the following stack trace:

Java 应用程序显示以下堆栈跟踪:

Running Spark version 2.0.1
Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Changing view acls to: lii5ka
Changing modify acls to: lii5ka
Changing view acls groups to:
Changing modify acls groups to:
SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(lii5ka); groups with view permissions: Set(); users  with modify permissions: Set(lii5ka); groups with modify permissions: Set()
Successfully started service 'sparkDriver' on port 61267.
Registering MapOutputTracker
Registering BlockManagerMaster
Created local directory at /private/var/folders/4k/h0sl02993_99bzt0dzv759000000gn/T/blockmgr-51de868d-3ba7-40be-8c53-f881f97ced63
MemoryStore started with capacity 2004.6 MB
Registering OutputCommitCoordinator
Logging initialized @48403ms
jetty-9.2.z-SNAPSHOT
Started o.s.j.s.ServletContextHandler@1316e7ec{/jobs,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@782de006{/jobs/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@2d0353{/jobs/job,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@381e24a0{/jobs/job/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@1c138dc8{/stages,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@b29739c{/stages/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@63f6de31{/stages/stage,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@2a04ddcb{/stages/stage/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@2af9688e{/stages/pool,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@6a0c5bde{/stages/pool/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@3f5e17f8{/storage,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@33b86f5d{/storage/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@5264dcbc{/storage/rdd,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@5a3ebf85{/storage/rdd/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@159082ed{/environment,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@6522c585{/environment/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@115774a1{/executors,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@3e3a3399{/executors/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@2f2c5959{/executors/threadDump,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@5c51afd4{/executors/threadDump/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@76893a83{/static,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@19c07930{/,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@54eb0dc0{/api,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@5953786{/stages/stage/kill,null,AVAILABLE}
Started ServerConnector@2eeb8bd6{HTTP/1.1}{0.0.0.0:4040}
Started @48698ms
Successfully started service 'SparkUI' on port 4040.
Bound SparkUI to 0.0.0.0, and started at http://192.168.0.104:4040
Connecting to master spark://10.20.30.50:7077...
Successfully created connection to /10.20.30.50:7077 after 25 ms (0 ms spent in bootstraps)
Connecting to master spark://10.20.30.50:7077...
Still have 2 requests outstanding when connection from /10.20.30.50:7077 is closed
Failed to connect to master 10.20.30.50:7077

org.apache.spark.SparkException: Exception thrown in awaitResult
        at org.apache.spark.rpc.RpcTimeout$$anonfun.applyOrElse(RpcTimeout.scala:77) ~[spark-core_2.11-2.0.1.jar:2.0.1]
        at org.apache.spark.rpc.RpcTimeout$$anonfun.applyOrElse(RpcTimeout.scala:75) ~[spark-core_2.11-2.0.1.jar:2.0.1]
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[scala-library-2.11.8.jar:na]
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1]
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1]
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) ~[scala-library-2.11.8.jar:na]
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) ~[spark-core_2.11-2.0.1.jar:2.0.1]
        at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88) ~[spark-core_2.11-2.0.1.jar:2.0.1]
        at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96) ~[spark-core_2.11-2.0.1.jar:2.0.1]
        at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$$anon.run(StandaloneAppClient.scala:106) ~[spark-core_2.11-2.0.1.jar:2.0.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_102]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_102]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_102]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_102]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102]
Caused by: java.io.IOException: Connection from /10.20.30.50:7077 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128) ~[spark-network-common_2.11-2.0.1.jar:2.0.1]
        at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109) ~[spark-network-common_2.11-2.0.1.jar:2.0.1]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182) ~[spark-network-common_2.11-2.0.1.jar:2.0.1]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:828) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:621) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
        ... 1 common frames omitted

In the Spark Master log on 10.20.30.50, I get the following error message:

在 Spark Master log on 中10.20.30.50,我收到以下错误消息:

16/11/05 14:47:20 ERROR OneForOneStrategy: Error while decoding incoming Akka PDU of length: 1298
akka.remote.transport.AkkaProtocolException: Error while decoding incoming Akka PDU of length: 1298
Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed.
    at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:167)
    at akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:580)
    at akka.remote.transport.ProtocolStateActor$$anonfun.applyOrElse(AkkaProtocolTransport.scala:375)
    at akka.remote.transport.ProtocolStateActor$$anonfun.applyOrElse(AkkaProtocolTransport.scala:343)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at akka.actor.FSM$class.processEvent(FSM.scala:604)
    at akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:269)
    at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:598)
    at akka.actor.FSM$$anonfun$receive.applyOrElse(FSM.scala:592)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:269)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
    at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:89)
    at com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108)
    at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6643)
    at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6607)
    at akka.remote.WireFormats$AkkaProtocolMessage.parsePartialFrom(WireFormats.java:6703)
    at akka.remote.WireFormats$AkkaProtocolMessage.parsePartialFrom(WireFormats.java:6698)
    at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
    at akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:6821)
    at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:168)
    ... 19 more

Additional Information

附加信息

  • The example works fine when I use new SparkConf().setMaster("local")instead
  • I can connect to the Spark Master with spark-shell --master spark://10.20.30.50:7077on the very same machine
  • 当我new SparkConf().setMaster("local")改为使用该示例时,该示例工作正常
  • 我可以spark-shell --master spark://10.20.30.50:7077在同一台机器上连接到 Spark Master

采纳答案by Ram Ghadiyaram

Looks like network error in the first place (but actually NOT) in the disguise of version mismatch of spark . You can point to correct version of spark jars mostly assembly jars.

最初看起来像网络错误(但实际上不是),伪装成 spark 版本不匹配。您可以指向正确版本的火花罐,主要是装配罐。

This issue may happen due to version miss match in Hadoop RPC call using Protobuffer.

此问题可能是由于使用 Protobuffer 的 Hadoop RPC 调用中的版本不匹配导致的。

when a protocol message being parsed is invalid in some way, e.g. it contains a malformed varint or a negative byte length.

当正在解析的协议消息以某种方式无效时,例如它包含格式错误的 varint 或负字节长度。

  • My experience with protobuf, InvalidProtocolBufferExceptioncan happen, only when the message was not able to parse(programatically if you are parsing protobuf message, may be message legth is zero or message is corrupted...).

  • Spark uses Akka Actors for Message Passing between Master/Driver and Workers and Internally akka uses googles protobuf to communicate. see method below from AkkaPduCodec.scala)

    override def decodePdu(raw: ByteString): AkkaPdu = {
        try {
          val pdu = AkkaProtocolMessage.parseFrom(raw.toArray)
          if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer()))
          else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction)
          else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null)
        } catch {
          case e: InvalidProtocolBufferException a?' throw new PduCodecException("Decoding PDU failed.", e)
        }
      }
    
  • 我在 protobuf 方面的经验InvalidProtocolBufferException可能发生,只有当消息无法解析时(以编程方式解析 protobuf 消息,可能是消息长度为零或消息已损坏......)。

  • Spark 使用 Akka Actors 在 Master/Driver 和 Workers 之间进行消息传递,而在内部 akka 使用 googles protobuf 进行通信。请参阅下面来自 AkkaPduCodec.scala 的方法)

    override def decodePdu(raw: ByteString): AkkaPdu = {
        try {
          val pdu = AkkaProtocolMessage.parseFrom(raw.toArray)
          if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer()))
          else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction)
          else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null)
        } catch {
          case e: InvalidProtocolBufferException a?' throw new PduCodecException("Decoding PDU failed.", e)
        }
      }
    

But in your case, since its version mismatch, new protobuf version message cant be parsed from old version of parser... or something like...

但是在您的情况下,由于其版本不匹配,因此无法从旧版本的解析器中解析新的 protobuf 版本消息...或类似...

If you are using maven other dependencies, pls. review.

如果您正在使用 maven 其他依赖项,请。。

回答by Michael Lihs

It turned out that I had Spark version 1.5.2 running in the virtual machine and used version 2.0.1 of the Spark library in Java. I fixed the issue by using the appropriate Spark library version in my pom.xmlwhich is

结果我在虚拟机中运行了 Spark 1.5.2 版,并在 Java 中使用了 Spark 库的 2.0.1 版。我在解决了该问题通过使用适当的Spark库的版本pom.xml

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

Another problem (that occurred later) was, that I also had to pin the Scala version with which the library was build. This is the _2.10suffix in the artifactId.

另一个问题(后来发生)是,我还必须固定构建库的 Scala 版本。这是_2.10artifactId 中的后缀。

Basically @RamPrassad's answer pointed me into the right direction but didn't give a clear advice what I need to do to fix my problem.

基本上@RamPrassad 的回答为我指明了正确的方向,但没有给出明确的建议我需要做什么来解决我的问题。

By the way: I couldn't update Spark in the virtual machine, since it was brought to me by the HortonWorks distribution...

顺便说一句:我无法在虚拟机中更新 Spark,因为它是由 HortonWorks 发行版带给我的......