Spark java.lang.StackOverflowError

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37909444/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 02:56:43  来源:igfitidea点击:

Spark java.lang.StackOverflowError

javaapache-sparkmapreduce

提问by Khal Mei

I'm using spark in order to calculate the pagerank of user reviews, but I keep getting Spark java.lang.StackOverflowErrorwhen I run my code on a big dataset (40k entries). when running the code on a small number of entries it works fine though.

我正在使用 spark 来计算用户评论的 pagerank,但是java.lang.StackOverflowError当我在大数据集(40k 条目)上运行我的代码时,我一直在使用 Spark 。在少量条目上运行代码时,它可以正常工作。

Entry Example :

输入示例:

product/productId: B00004CK40   review/userId: A39IIHQF18YGZA   review/profileName: C. A. M. Salas  review/helpfulness: 0/0 review/score: 4.0   review/time: 1175817600 review/summary: Reliable comedy review/text: Nice script, well acted comedy, and a young Nicolette Sheridan. Cusak is in top form.

The Code:

代码:

public void calculatePageRank() {
    sc.clearCallSite();
    sc.clearJobGroup();

    JavaRDD < String > rddFileData = sc.textFile(inputFileName).cache();
    sc.setCheckpointDir("pagerankCheckpoint/");

    JavaRDD < String > rddMovieData = rddFileData.map(new Function < String, String > () {

        @Override
        public String call(String arg0) throws Exception {
            String[] data = arg0.split("\t");
            String movieId = data[0].split(":")[1].trim();
            String userId = data[1].split(":")[1].trim();
            return movieId + "\t" + userId;
        }
    });

    JavaPairRDD<String, Iterable<String>> rddPairReviewData = rddMovieData.mapToPair(new PairFunction < String, String, String > () {

        @Override
        public Tuple2 < String, String > call(String arg0) throws Exception {
            String[] data = arg0.split("\t");
            return new Tuple2 < String, String > (data[0], data[1]);
        }
    }).groupByKey().cache();


    JavaRDD<Iterable<String>> cartUsers = rddPairReviewData.map(f -> f._2());
      List<Iterable<String>> cartUsersList = cartUsers.collect();
      JavaPairRDD<String,String> finalCartesian = null;
      int iterCounter = 0;
      for(Iterable<String> out : cartUsersList){
          JavaRDD<String> currentUsersRDD = sc.parallelize(Lists.newArrayList(out));
          if(finalCartesian==null){
              finalCartesian = currentUsersRDD.cartesian(currentUsersRDD);
          }
          else{
              finalCartesian = currentUsersRDD.cartesian(currentUsersRDD).union(finalCartesian);
              if(iterCounter % 20 == 0) {
                  finalCartesian.checkpoint();
              }
          }
      }
      JavaRDD<Tuple2<String,String>> finalCartesianToTuple = finalCartesian.map(m -> new Tuple2<String,String>(m._1(),m._2()));

      finalCartesianToTuple = finalCartesianToTuple.filter(x -> x._1().compareTo(x._2())!=0);
      JavaPairRDD<String, String> userIdPairs = finalCartesianToTuple.mapToPair(m -> new Tuple2<String,String>(m._1(),m._2()));

      JavaRDD<String> userIdPairsString = userIdPairs.map(new Function < Tuple2<String, String>, String > () {

        //Tuple2<Tuple2<MovieId, userId>, Tuple2<movieId, userId>>
          @Override
          public String call (Tuple2<String, String> t) throws Exception {
            return t._1 + " " + t._2;
          }
      });

    try {

//calculate pagerank using this https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
        JavaPageRank.calculatePageRank(userIdPairsString, 100);
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    sc.close();

}

回答by Chitral Verma

I have multiple suggestions which will help you to greatly improve the performance of the code in your question.

我有多个建议可以帮助您大大提高问题中代码的性能。

  1. Caching:Caching should be used on those data sets which you need to refer to again and again for same/ different operations (iterative algorithms.
  1. 缓存:缓存应该用于那些您需要为相同/不同操作(迭代算法)反复引用的数据集。

An example is RDD.count— to tell you the number of lines in the file, the file needs to be read. So if you write RDD.count, at this point the file will be read, the lines will be counted, and the count will be returned.

What if you call RDD.countagain? The same thing: the file will be read and counted again. So what does RDD.cachedo? Now, if you run RDD.countthe first time, the file will be loaded, cached, and counted. If you call RDD.counta second time, the operation will use the cache. It will just take the data from the cache and count the lines, no recomputing.

一个例子是 RDD。count——告诉你文件的行数,需要读取文件。所以如果你写RDD。count,此时会读取文件,计算行数,返回计数。

如果你调用 RDD 呢?count再次?同样的事情:文件将被再次读取和计数。那么RDD有什么作用呢?cache做?现在,如果你运行 RDD。count第一次,文件将被加载、缓存和计数。如果你调用RDD。count第二次,操作将使用缓存。它只会从缓存中获取数据并计算行数,无需重新计算。

Read more about caching here.

在此处阅读有关缓存的更多信息。

In your code sample you are not reusing anything that you've cached. So you may remove the .cachefrom there.

在您的代码示例中,您没有重用已缓存的任何内容。所以你可以.cache从那里删除。

  1. Parallelization:In the code sample, you've parallelized every individual element in your RDD which is already a distributed collection. I suggest you to merge the rddFileData, rddMovieDataand rddPairReviewDatasteps so that it happens in one go.
  1. 并行化:在代码示例中,您已经并行化了 RDD 中的每个单独元素,它已经是一个分布式集合。我建议您合并rddFileData,rddMovieDatarddPairReviewData步骤,以便一次性完成。

Get rid of .collectsince that brings the results back to the driver and maybe the actual reason for your error.

摆脱,.collect因为这会将结果返回给驱动程序,并且可能是您错误的实际原因。

回答by Joe Widen

When your for loop grows really large, Spark can no longer keep track of the lineage. Enable checkpointing in your for loop to checkpoint your rdd every 10 iterations or so. Checkpointing will fix the problem. Don't forget to clean up the checkpoint directory after.

当您的 for 循环变得非常大时,Spark 无法再跟踪沿袭。在 for 循环中启用检查点以每 10 次左右迭代检查点您的 rdd。检查点将解决问题。之后不要忘记清理检查点目录。

http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

回答by DILIP KUMAR

This problem will occur when your DAG grows big and too many level of transformations happening in your code. The JVM will not be able to hold the operations to perform lazy execution when an action is performed in the end.

当您的 DAG 变大并且您的代码中发生太多级别的转换时,就会出现此问题。当一个动作最终执行时,JVM 将无法保留执行延迟执行的操作。

Checkpointing is one option. I would suggest to implement spark-sql for this kind of aggregations. If your data is structured, try to load that into dataframes and perform grouping and other mysql functions to achieve this.

检查点是一种选择。我建议为这种聚合实现 spark-sql。如果您的数据是结构化的,请尝试将其加载到数据帧中并执行分组和其他 mysql 功能来实现这一点。

回答by Guillaume Chevalier

Unfortunately, the solution that worked easily for me was to call .collect()after every few iterations. Well, things work at least, as a quick fix.

不幸的是,对我来说很容易的解决方案是.collect()在每几次迭代后调用。好吧,事情至少可以作为快速解决方案。

In a hurry, I couldn't make the suggest solution to use checkpoint to work (and maybe it wouldn't have worked anyway?)

匆忙中,我无法提出使用检查点工作的建议解决方案(也许它无论如何都行不通?)



Note: it also seems that setting spark option might do the trick... but I don't have the time right now so I didn't check how to set spark's java options from pyspark, if that's possible. Related pages for changing config:

注意:设置 spark 选项似乎也可以解决问题……但我现在没有时间,所以如果可能的话,我没有检查如何从 pyspark 设置 spark 的 java 选项。更改配置的相关页面:

If someone gets that to work by changing the max recursion limit, a comment here would be nice for others.

如果有人通过更改最大递归限制来实现这一点,那么这里的评论对其他人会很好。