scala 在大型数据集上运行 spark 时出现“sparkContext 已关闭”

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32822948/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:39:36  来源:igfitidea点击:

"sparkContext was shut down" while running spark on a large dataset

scalaapache-sparkyarnapache-spark-sql

提问by Aleksander Zendel

When running sparkJob on a cluster past a certain data size(~2,5gb) I am getting either "Job cancelled because SparkContext was shut down" or "executor lost". When looking at yarn gui I see that job that got killed was successful. There are no problems when running on data that is 500mb. I was looking for a solution and found that: - "seems yarn kills some of the executors as they request more memory than expected."

在超过特定数据大小(~2,5gb)的集群上运行 sparkJob 时,我收到“作业取消,因为 SparkContext 已关闭”或“执行程序丢失”。在查看 yarn gui 时,我看到被杀死的工作是成功的。在 500mb 的数据上运行时没有问题。我正在寻找解决方案并发现:-“似乎 yarn 杀死了一些执行程序,因为它们请求的内存比预期的要多。”

Any suggestions how to debug it?

任何建议如何调试它?

command that I submit my spark job with:

命令我提交我的火花作业:

/opt/spark-1.5.0-bin-hadoop2.4/bin/spark-submit  --driver-memory 22g --driver-cores 4 --num-executors 15 --executor-memory 6g --executor-cores 6  --class sparkTesting.Runner   --master yarn-client myJar.jar jarArguments

and sparkContext settings

和 sparkContext 设置

val sparkConf = (new SparkConf()
    .set("spark.driver.maxResultSize", "21g")
    .set("spark.akka.frameSize", "2011")
    .set("spark.eventLog.enabled", "true")
    .set("spark.eventLog.enabled", "true")
    .set("spark.eventLog.dir", configVar.sparkLogDir)
    )

Simplified code that fails looks like that

失败的简化代码看起来像这样

 val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val broadcastParser = sc.broadcast(new Parser())

val featuresRdd = hc.sql("select "+ configVar.columnName + " from " + configVar.Table +" ORDER BY RAND() LIMIT " + configVar.Articles)
val myRdd : org.apache.spark.rdd.RDD[String] = featuresRdd.map(doSomething(_,broadcastParser))

val allWords= featuresRdd
  .flatMap(line => line.split(" "))
  .count

val wordQuantiles= featuresRdd
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)
  .map(pair => (pair._2 , pair._2))
  .reduceByKey(_+_)
  .sortBy(_._1)
  .collect
  .scanLeft((0,0.0)) ( (res,add) => (add._1, res._2+add._2) )
  .map(entry => (entry._1,entry._2/allWords))

val dictionary = featuresRdd
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _) // here I have Rdd of word,count tuples
  .filter(_._2 >= moreThan)
  .filter(_._2 <= lessThan)
  .filter(_._1.trim!=(""))
  .map(_._1)
  .zipWithIndex
  .collect
  .toMap

And Error stack

和错误堆栈

Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop.apply(DAGScheduler.scala:703)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop.apply(DAGScheduler.scala:702)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1511)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1435)
at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1715)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1714)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:146)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
at org.apache.spark.rdd.RDD.count(RDD.scala:1121)
at sparkTesting.InputGenerationAndDictionaryComputations$.createDictionary(InputGenerationAndDictionaryComputations.scala:50)
at sparkTesting.Runner$.main(Runner.scala:133)
at sparkTesting.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

采纳答案by Aleksander Zendel

Found the answer.

找到了答案。

The my table was saved as a 20gb avro file. When executors tried to open it. Each of them had to load 20gb into memory. Solved it by using csv instead of avro

我的表被保存为一个 20GB 的 avro 文件。当执行者试图打开它时。他们每个人都必须将 20GB 加载到内存中。通过使用 csv 而不是 avro 解决了它

回答by JosiahYoder-deactive except..

Another possible cause of the "SparkContext is shutdown" error is that you are importing a jar file after evaluating some other code. (This may only happen in Spark Notebook.)

“SparkContext 已关闭”错误的另一个可能原因是您在评估其他一些代码后正在导入 jar 文件。(这可能只发生在 Spark Notebook 中。)

To fix the problem, move all your :cp myjar.jarstatements to the start of your file.

要解决此问题,请将所有:cp myjar.jar语句移动到文件的开头。

回答by Robert Chevallier

Symptoms are typical of a OutOfMemory error in one the executor tasks. Try augmenting memory for executor when lauching job. See parameter --executor-memory of saprk-submit, spark-shell etc. Default value is 1G

症状是执行程序任务中的 OutOfMemory 错误的典型表现。在启动工作时尝试增加执行程序的内存。参见saprk-submit、spark-shell等参数--executor-memory 默认值为1G