scala Spark 在大型 shuffle 作业中失败,java.io.IOException: Filesystem closed

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/24038908/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:18:37  来源:igfitidea点击:

Spark fails on big shuffle jobs with java.io.IOException: Filesystem closed

scalahadoophdfsapache-spark

提问by samthebest

I often find spark fails with large jobs with a rather unhelpful meaningless exception. The worker logs look normal, no errors, but they get state "KILLED". This is extremely common for large shuffles, so operations like .distinct.

我经常发现 Spark 在大型工作中失败,但有一个相当无用的无意义例外。工作日志看起来很正常,没有错误,但它们的状态是“KILLED”。这对于大型洗牌非常常见,因此像.distinct.

The question is, how do I diagnose what's going wrong, and ideally, how do I fix it?

问题是,我如何诊断出了什么问题,理想情况下,我该如何解决?

Given that a lot of these operations are monoidal I've been working around the problem by splitting the data into, say 10, chunks, running the app on each chunk, then running the app on all of the resulting outputs. In other words - meta-map-reduce.

鉴于很多这些操作都是幺半群,我一直在通过将数据拆分为 10 个块,在每个块上运行应用程序,然后在所有结果输出上运行应用程序来解决这个问题。换句话说 - 元映射减少。

14/06/04 12:56:09 ERROR client.AppClient$ClientActor: Master removed our application: FAILED; stopping client
14/06/04 12:56:09 WARN cluster.SparkDeploySchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
14/06/04 12:56:09 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Filesystem closed
    at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
    at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159)
    at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143)
    at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
    at java.io.InputStream.read(InputStream.java:101)
    at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
    at org.apache.spark.rdd.HadoopRDD$$anon.getNext(HadoopRDD.scala:164)
    at org.apache.spark.rdd.HadoopRDD$$anon.getNext(HadoopRDD.scala:149)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
    at scala.collection.AbstractIterator.toList(Iterator.scala:1157)
    at $line5.$read$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:13)
    at $line5.$read$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:13)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:450)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:450)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run.apply$mcV$sp(Executor.scala:213)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon.run(SparkHadoopUtil.scala:42)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon.run(SparkHadoopUtil.scala:41)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

采纳答案by Niketan

As of September 1st 2014, this is an "open improvement" in Spark. Please see https://issues.apache.org/jira/browse/SPARK-3052. As syrza pointed out in the given link, the shutdown hooks are likely done in incorrect order when an executor failed which results in this message. I understand you will have to little more investigation to figure out the main cause of problem (i.e. why your executor failed). If it is a large shuffle, it might be an out-of-memory error which cause executor failure which then caused the Hadoop Filesystem to be closed in their shutdown hook. So, the RecordReaders in running tasks of that executor throw "java.io.IOException: Filesystem closed" exception. I guess it will be fixed in subsequent release and then you will get more helpful error message :)

截至 2014 年 9 月 1 日,这是 Spark 中的“开放式改进”。请参阅https://issues.apache.org/jira/browse/SPARK-3052。正如 syrza 在给定链接中指出的那样,当执行程序失败并导致此消息时,关闭挂钩可能以不正确的顺序完成。我知道您需要进行更多调查才能找出问题的主要原因(即您的执行人失败的原因)。如果是大 shuffle,则可能是内存不足错误,导致执行程序失败,然后导致 Hadoop 文件系统在其关闭挂钩中关闭。因此,该执行程序的运行任务中的 RecordReaders 抛出“java.io.IOException: Filesystem closed”异常。我想它将在后续版本中修复,然后您会收到更多有用的错误消息:)

回答by Daniel Darabos

Something calls DFSClient.close()or DFSClient.abort(), closing the client. The next file operation then results in the above exception.

有东西调用DFSClient.close()DFSClient.abort(),关闭客户端。下一个文件操作将导致上述异常。

I would try to figure out what calls close()/abort(). You could use a breakpoint in your debugger, or modify the Hadoop source code to throw an exception in these methods, so you would get a stack trace.

我会试着弄清楚什么叫close()/ abort()。您可以在调试器中使用断点,或修改 Hadoop 源代码以在这些方法中抛出异常,以便获得堆栈跟踪。

回答by N.C

The exception about “file system closed” can be solved if the spark job is running on a cluster. You can set properties like spark.executor.cores , spark.driver.cores and spark.akka.threads to the maximum values w.r.t your resource availability. I had the same problem when my dataset was pretty large with JSON data about 20 million records. I fixed it with the above properties and it ran like a charm. In my case, I set those properties to 25,25 and 20 respectively. Hope it helps!!

如果 spark 作业在集群上运行,则可以解决“文件系统关闭”的异常。您可以将 spark.executor.cores 、 spark.driver.cores 和 spark.akka.threads 等属性设置为资源可用性的最大值。当我的数据集非常大并且包含大约 2000 万条记录的 JSON 数据时,我遇到了同样的问题。我用上述属性修复了它,它运行起来就像一个魅力。就我而言,我将这些属性分别设置为 25,25 和 20。希望能帮助到你!!

Reference Link:

参考链接:

http://spark.apache.org/docs/latest/configuration.html

http://spark.apache.org/docs/latest/configuration.html