scala 如何修复“MetadataFetchFailedException:缺少随机播放的输出位置”?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36815506/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:12:53  来源:igfitidea点击:

How to fix "MetadataFetchFailedException: Missing an output location for shuffle"?

scalaapache-sparkapache-spark-mllibword2vec

提问by displayname

If I increase the model size of my word2vec model I start to get this kind of exception in my log:

如果我增加 word2vec 模型的模型大小,我的日志中就会开始出现这种异常:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:542)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:538)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:772)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
    at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute.apply(CoalescedRDD.scala:96)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute.apply(CoalescedRDD.scala:95)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

I tried to write my own "save model" version which looks like this:

我尝试编写自己的“保存模型”版本,如下所示:

  def save(model: Word2VecModel, sc: SparkContext, path: String): Unit = {

    println("Saving model as CSV ..")

    val vectorSize = model.getVectors.values.head.size

    println("vectorSize="+vectorSize)

    val SEPARATOR_TOKEN = " "    
    val dataArray = model.getVectors.toSeq.map { case (w, v) => Data(w, v) }

    println("Got dataArray ..")
    println("parallelize(dataArray, 10)")
    val par = sc.parallelize(dataArray, 10)
          .map(d => {

            val sb = new mutable.StringBuilder()
            sb.append(d.word)
            sb.append(SEPARATOR_TOKEN)

            for(v <- d.vector) {
              sb.append(v)
              sb.append(SEPARATOR_TOKEN)
            }
            sb.setLength(sb.length - 1)
            sb.append("\n")
            sb.toString()
          })
    println("repartition(1)")
    val rep = par.repartition(1)
    println("collect()")
    val vectorsAsString = rep.collect()

    println("Collected serialized vectors ..")    

    val cfile = new mutable.StringBuilder()

    cfile.append(vectorsAsString.length)
    cfile.append(" ")
    cfile.append(vectorSize)
    cfile.append("\n")

    val sb = new StringBuilder
    sb.append("word,")
    for(i <- 0 until vectorSize) {
      sb.append("v")
      sb.append(i.toString)
      sb.append(",")
    }
    sb.setLength(sb.length - 1)
    sb.append("\n")

    for(vectorString <- vectorsAsString) {
      sb.append(vectorString)
      cfile.append(vectorString)
    }

    println("Saving file to " + new Path(path, "data").toUri.toString)
    sc.parallelize(sb.toString().split("\n"), 1).saveAsTextFile(new Path(path+".csv", "data").toUri.toString)
    sc.parallelize(cfile.toString().split("\n"), 1).saveAsTextFile(new Path(path+".cs", "data").toUri.toString)
  }

Apparently it's working similar to their current implementation- it doesn't.

显然,它的工作方式与他们当前的实现类似- 事实并非如此。

I'd like to get a word2vec model. It works with small files but not if the model gets larger.

我想要一个 word2vec 模型。它适用于小文件,但不适用于模型变大的情况。

回答by Jacek Laskowski

MetadataFetchFailedExceptionis thrown when a MapOutputTrackeron an executor could not find requested shuffle map outputs for partitions in local cache and tried to fetch them remotely from the driver's MapOutputTracker.

MetadataFetchFailedExceptionMapOutputTracker执行程序上的a无法在本地缓存中为分区找到请求的随机映射输出并尝试从驱动程序的MapOutputTracker.

That could lead to few conclusions:

这可能会导致几个结论:

  1. The driver's memory issues
  2. The executors' memory issues
  3. Executors being lost
  1. 驱动程序的内存问题
  2. 执行者的记忆问题
  3. 遗失的执行者

Please review the logs looking for issues reported as "Executor lost" INFO messages and/or review web UI's Executors page and see how the executors work.

请查看日志以查找报告为“Executor lost”INFO 消息的问题和/或查看 Web UI 的 Executors 页面并查看 executors 是如何工作的。

The root cause of executors being lost may also be that the cluster manager has decided to kill ill-behaved executors (that may have used up more memory than requested).

executor 丢失的根本原因也可能是集群管理器决定杀死行为不端的 executor(可能使用了比请求更多的内存)。

See the other question FetchFailedException or MetadataFetchFailedException when processing big data setfor more insights.

请参阅处理大数据集时的其他问题FetchFailedException 或 MetadataFetchFailedException以获取更多见解。