scala 按键分组时,Spark 内存不足

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/22637518/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:08:46  来源:igfitidea点击:

Spark runs out of memory when grouping by key

scalaamazon-ec2apache-spark

提问by John McCrae

I am attempting to perform a simple transformation of common crawl data using Spark host on an EC2 using this guide, my code looks like this:

我正在尝试使用本指南在 EC2 上使用 Spark 主机对常见抓取数据执行简单转换,我的代码如下所示:

package ccminer

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object ccminer {
  val english = "english|en|eng"
  val spanish = "es|esp|spa|spanish|espanol"
  val turkish = "turkish|tr|tur|turc"
  val greek = "greek|el|ell"
  val italian = "italian|it|ita|italien"
  val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|")

  def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*")

  def main(args: Array[String]): Unit = {
    if (args.length != 3) {
      System.err.println("Bad command line")
      System.exit(-1)
    }

    val cluster = "spark://???"
    val sc = new SparkContext(cluster, "Common Crawl Miner",
      System.getenv("SPARK_HOME"), Seq("/root/spark/ccminer/target/scala-2.10/cc-miner_2.10-1.0.jar"))

    sc.sequenceFile[String, String](args(0)).map {
      case (k, v) => (langIndep(k), v)
    }
    .groupByKey(args(2).toInt)
    .filter {
      case (_, vs) => vs.size > 1
    }
    .saveAsTextFile(args(1))
  }
}

And I am running it with the command as follows:

我使用以下命令运行它:

sbt/sbt "run-main ccminer.ccminer s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690165636/textData-* s3n://parallelcorpus/out/ 2000"

But very quickly it fails with errors as follows

但很快它就失败了,错误如下

java.lang.OutOfMemoryError: Java heap space
at com.ning.compress.BufferRecycler.allocEncodingBuffer(BufferRecycler.java:59)
at com.ning.compress.lzf.ChunkEncoder.<init>(ChunkEncoder.java:93)
at com.ning.compress.lzf.impl.UnsafeChunkEncoder.<init>(UnsafeChunkEncoder.java:40)
at com.ning.compress.lzf.impl.UnsafeChunkEncoderLE.<init>(UnsafeChunkEncoderLE.java:13)
at com.ning.compress.lzf.impl.UnsafeChunkEncoders.createEncoder(UnsafeChunkEncoders.java:31)
at com.ning.compress.lzf.util.ChunkEncoderFactory.optimalInstance(ChunkEncoderFactory.java:44)
at com.ning.compress.lzf.LZFOutputStream.<init>(LZFOutputStream.java:61)
at org.apache.spark.io.LZFCompressionCodec.compressedOutputStream(CompressionCodec.scala:60)
at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:803)
at org.apache.spark.storage.BlockManager$$anonfun.apply(BlockManager.scala:471)
at org.apache.spark.storage.BlockManager$$anonfun.apply(BlockManager.scala:471)
at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117)
at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask.apply(ShuffleMapTask.scala:164)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask.apply(ShuffleMapTask.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

So my basic question is, what is necessary to write a Spark task that can group by key with an almost unlimited amount of input without running out of memory?

所以我的基本问题是,编写一个 Spark 任务需要什么,该任务可以按几乎无限量的输入进行分组,而不会耗尽内存?

回答by homutov

The most common cause of java.lang.OutOfMemoryError exceptions in shuffle tasks (such as groupByKey, reduceByKey, etc.) is low level of parallelism.

shuffle 任务(如 groupByKey、reduceByKey 等)中 java.lang.OutOfMemoryError 异常的最常见原因是并行度低。

You can increase default value by setting spark.default.parallelismproperty in configuration.

您可以通过在配置中设置spark.default.parallelism属性来增加默认值。

回答by Kavindu Dodanduwa

So this says that you have run out of allocated heap spaceof JVM. You may increase heap size but still this is limited by system capabilities (Cannot exceed the amount of physical RAM).

所以这表示您已经用完了JVM的分配堆空间。您可以增加堆大小,但这仍然受到系统功能的限制(不能超过物理 RAM 的数量)。

On the other hand as explained by homutovthis happens in large collecting operations. For example groupByKey, reduceByKey, cartisien + mapToPair . These operations will collect the RDD data into one place making JVM to run out of heap space.

另一方面,正如homutov所解释的,这发生在大型收集操作中。例如 groupByKey、reduceByKey、cartisien + mapToPair。这些操作会将 RDD 数据收集到一个地方,从而使 JVM 耗尽堆空间

What can you do?

你能做什么?

With my experience, when a cluster/system have limited resources, you can use use Spark tuning guide. spark.default.parallelismcan be increased till you can accompany task into your cluster/system [I once ran a KNN implementation for 14000 instance, 1024 feature dataset on my laptop's virtual machine by tweaking parallelism ].

根据我的经验,当集群/系统资源有限时,您可以使用使用Spark 调优指南spark.default.parallelism可以增加,直到您可以将任务添加到集群/系统中[我曾经通过调整并行性在笔记本电脑的虚拟机上运行了 14000 个实例、1024 个特征数据集的 KNN 实现]。

Command line flag :   --conf spark.default.parallelism=4   ; 4 is the parallelism value

Remember, you need to TUNEthese features to most effective and fail avoidance (running out of heap) setting to get best results out of Spark.

请记住,你需要TUNE这些功能来最有效和失效避免(堆耗尽)的设置以获得最好的结果了星火。

Additionally

此外

Remember to use use primitivedatatypes instead of wrappers. And use Arraysinstead of collections.

请记住使用原始数据类型而不是包装器。并使用Arrays而不是collections

 ex :  List<Integers> vs int[] ; int[] is better than List 

In Spark arrays can save many valuable space and improve performance.

在 Spark 数组中可以节省许多宝贵的空间并提高性能。

Also use BroadCastvariables instead of Cartesian product or any large combination task.

还可以使用BroadCast变量而不是笛卡尔积或任何大型组合任务。