scala 为什么 Spark 在本地模式下失败并显示“无法获得广播_0 的广播_0_片0”?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/34457486/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Why does Spark fail with "Failed to get broadcast_0_piece0 of broadcast_0" in local mode?
提问by Saulo Ricci
I'm running this snippet to sort an RDD of points, ordering the RDD and taking the K-nearest points from a given point:
我正在运行此代码段来对点的 RDD 进行排序,对 RDD 进行排序并从给定点获取 K 最近点:
def getKNN(sparkContext:SparkContext, k:Int, point2:Array[Double], pointsRDD:RDD[Array[Double]]): RDD[Array[Double]] = {
val tuplePointDistanceRDD:RDD[(Double, Array[Double])] = pointsRDD.map(point =>
(DistanceUtils.euclidianDistance(point, point2), point))
sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))
}
}
Using just one SparkContext in my application and passing it as a parameter to my function, I'm getting a org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0error at the moment I call sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))to get the KNN points from point2.
在我的应用程序中只使用一个 SparkContext 并将其作为参数传递给我的函数,org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0当我调用sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))从point2.
I'm constructing sparkContextas this snippet bellow:
我正在构建sparkContext如下代码片段:
var sparkContext = new SparkContext("local", "<app_name>")
What would be the possible causes of facing this kind of error?
面临这种错误的可能原因是什么?
Basically this is the LOG of my standalone spark environment with the stack trace of this error:
基本上这是我的独立 spark 环境的 LOG 以及此错误的堆栈跟踪:
15/12/24 11:55:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:55731]
15/12/24 11:55:29 INFO Utils: Successfully started service 'sparkDriver' on port 55731.
15/12/24 11:55:29 INFO SparkEnv: Registering MapOutputTracker
15/12/24 11:55:29 INFO SparkEnv: Registering BlockManagerMaster
15/12/24 11:55:29 INFO DiskBlockManager: Created local directory at /private/var/folders/0r/3b6d3b6j45774_9616myw4440000gn/T/blockmgr-70e73cfe-683b-4297-aa5d-de38f98d02f1
15/12/24 11:55:29 INFO MemoryStore: MemoryStore started with capacity 491.7 MB
15/12/24 11:55:29 INFO HttpFileServer: HTTP File server directory is /private/var/folders/0r/3b6d3b6j45774_9616myw4440000gn/T/spark-f7bc8b6f-7d4f-4c55-8dff-0fbc4f6c2532/httpd-fb502369-4c28-4585-a37e-f3645d1d55a3
15/12/24 11:55:29 INFO HttpServer: Starting HTTP Server
15/12/24 11:55:29 INFO Utils: Successfully started service 'HTTP file server' on port 55732.
15/12/24 11:55:29 INFO SparkEnv: Registering OutputCommitCoordinator
15/12/24 11:55:29 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/12/24 11:55:29 INFO SparkUI: Started SparkUI at http://localhost:4040
15/12/24 11:55:29 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
15/12/24 11:55:29 INFO Executor: Starting executor ID driver on host localhost
15/12/24 11:55:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55733.
15/12/24 11:55:29 INFO NettyBlockTransferService: Server created on 55733
15/12/24 11:55:29 INFO BlockManagerMaster: Trying to register BlockManager
15/12/24 11:55:29 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55733 with 491.7 MB RAM, BlockManagerId(driver, localhost, 55733)
15/12/24 11:55:29 INFO BlockManagerMaster: Registered BlockManager
15/12/24 11:55:30 INFO TorrentBroadcast: Started reading broadcast variable 0
org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:144)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.RDD.sortBy$default(RDD.scala:548)
at LOF$.getKNN(LOF.scala:14)
at LOF$.lof(LOF.scala:25)
at BehaviourActivityScoreJudgeTest$$anonfun.apply$mcV$sp(BehaviourActivityScoreJudgeTest.scala:14)
at BehaviourActivityScoreJudgeTest$$anonfun.apply(BehaviourActivityScoreJudgeTest.scala:11)
at BehaviourActivityScoreJudgeTest$$anonfun.apply(BehaviourActivityScoreJudgeTest.scala:11)
at org.scalatest.Transformer$$anonfun$apply.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FlatSpecLike$$anon.apply(FlatSpecLike.scala:1647)
at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683)
at org.scalatest.FlatSpecLike$class.invokeWithFixture(FlatSpecLike.scala:1644)
at org.scalatest.FlatSpecLike$$anonfun$runTest.apply(FlatSpecLike.scala:1656)
at org.scalatest.FlatSpecLike$$anonfun$runTest.apply(FlatSpecLike.scala:1656)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656)
at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1683)
at org.scalatest.FlatSpecLike$$anonfun$runTests.apply(FlatSpecLike.scala:1714)
at org.scalatest.FlatSpecLike$$anonfun$runTests.apply(FlatSpecLike.scala:1714)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:413)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:401)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:427)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:401)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1714)
at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1683)
at org.scalatest.Suite$class.run(Suite.scala:1424)
at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1683)
at org.scalatest.FlatSpecLike$$anonfun$run.apply(FlatSpecLike.scala:1760)
at org.scalatest.FlatSpecLike$$anonfun$run.apply(FlatSpecLike.scala:1760)
at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1760)
at BehaviourActivityScoreJudgeTest.org$scalatest$BeforeAndAfterAll$$super$run(BehaviourActivityScoreJudgeTest.scala:4)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1(BeforeAndAfterAll.scala:257)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
at BehaviourActivityScoreJudgeTest.run(BehaviourActivityScoreJudgeTest.scala:4)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun.apply(Runner.scala:2563)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun.apply(Runner.scala:2557)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter.apply(Runner.scala:1044)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter.apply(Runner.scala:1043)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
at org.scalatest.tools.Runner$.run(Runner.scala:883)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:137)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$$anonfun.apply(TorrentBroadcast.scala:138)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$$anonfun.apply(TorrentBroadcast.scala:138)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply$mcVI$sp(TorrentBroadcast.scala:137)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:120)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock.apply(TorrentBroadcast.scala:175)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175)
... 94 more
15/12/24 11:55:30 INFO SparkUI: Stopped Spark web UI at http://localhost:4040
15/12/24 11:55:30 INFO DAGScheduler: Stopping DAGScheduler
15/12/24 11:55:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/12/24 11:55:30 INFO MemoryStore: MemoryStore cleared
15/12/24 11:55:30 INFO BlockManager: BlockManager stopped
15/12/24 11:55:30 INFO BlockManagerMaster: BlockManagerMaster stopped
15/12/24 11:55:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
15/12/24 11:55:30 INFO SparkContext: Successfully stopped SparkContext
15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
回答by Saulo Ricci
Just discovered why I was getting this exception: for a reason my SparkContextobject started/stopped several times between ScalaTestmethods. So, fixing that behaviour lead me to get spark working in the right way I would expect.
刚刚发现为什么我会收到此异常:出于某种原因,我的SparkContext对象在ScalaTest方法之间多次启动/停止。因此,修复该行为使我以我期望的正确方式获得火花。
回答by Kevin Kehoe
I was getting this error as well. I haven't really seen any concrete coding examples, so I will share my solution. This cleared the error for me, but I have a sense that there may be more than 1 solution to this problem. But this would be worth a go as it keeps everything within the code.
我也收到了这个错误。我还没有真正看到任何具体的编码示例,所以我将分享我的解决方案。这为我清除了错误,但我感觉这个问题可能有不止一种解决方案。但这值得一试,因为它将所有内容保留在代码中。
It looks as though the SparkContext was shutting down, thus throwing the error. I think the issue is that the SparkContext is created in a class and then extended to other classes. The extension causes it to shut down, which is a bit annoying. Below is the implementation I used to get this error to clear.
看起来好像 SparkContext 正在关闭,从而引发错误。我认为问题在于 SparkContext 是在一个类中创建的,然后扩展到其他类。扩展导致它关闭,这有点烦人。下面是我用来清除此错误的实现。
Spark Initialisation Class:
Spark初始化类:
import org.apache.spark.{SparkConf, SparkContext}
class Spark extends Serializable {
def getContext: SparkContext = {
@transient lazy val conf: SparkConf =
new SparkConf()
.setMaster("local")
.setAppName("test")
@transient lazy val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("OFF")
sc
}
}
Main Class:
主要类:
object Test extends Spark{
def main(args: Array[String]): Unit = {
val sc = getContext
val irisRDD: RDD[String] = sc.textFile("...")
...
}
Then just extend your other class with the the Spark Class and it should all work out.
然后只需使用 Spark 类扩展您的另一个类,它应该都可以解决。
I was getting the error running LogisticRegression Models, so I would assume this should fix it for you as well with other Machine Learning libraries as well.
我在运行 LogisticRegression 模型时遇到错误,所以我认为这也应该为您修复它以及其他机器学习库。
回答by Jake
Related to the above answers, I encountered this issue when I inadvertently serialized a datastax connector (i.e Cassandra connection driver) query to a spark slave. This then spun off its own SparkContext and within 4 seconds the entire application had crashed
与上述答案相关,当我无意中将 datastax 连接器(即 Cassandra 连接驱动程序)查询序列化到 spark 从站时遇到了这个问题。然后它脱离了自己的 SparkContext 并且在 4 秒内整个应用程序崩溃了
回答by Kamrus
For me helped this, because SparkContext was already created
对我有帮助,因为 SparkContext 已经创建
val sc = SparkContext.getOrCreate()
Before i tried with this
在我尝试这个之前
val conf = new SparkConf().setAppName("Testing").setMaster("local").set("spark.driver.allowMultipleContexts", "true")
val sc = SparkContext(conf)
But it was broken when i ran
但是我跑的时候坏了
spark.createDataFrame(rdd, schema)
回答by Vikas Singh
I was also facing the same issue. after a lot of googling I found that I have made a singleton class for SparkContext initialization which is only valid for a single JVM instance, but in case of Spark this singleton class will be invoked from each worker node running on separate JVM instance and hence lead to multiple SparkContext object.
我也面临同样的问题。经过大量的谷歌搜索后,我发现我为 SparkContext 初始化创建了一个单例类,它只对单个 JVM 实例有效,但在 Spark 的情况下,这个单例类将从在单独的 JVM 实例上运行的每个工作节点调用,因此导致到多个 SparkContext 对象。

