按行遍历 Java RDD

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31834825/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 19:13:10  来源:igfitidea点击:

Iterate through a Java RDD by row

javaapache-sparkrdd

提问by Katya Handler

I would like to iterate through an RDD of strings and "do something" to each string. The output should be double[][]. Here is an example with a for loop. I understand I need to use (I think) the foreachfunction for Java RDDs. However, I have no idea how to understand the syntax. Documentation is not particularly helpful. I do not have Java 8.

我想遍历字符串的 RDD 并对每个字符串“做某事”。输出应该是double[][]. 这是一个带有 for 循环的示例。我知道我需要使用(我认为)foreachJava RDD的函数。但是,我不知道如何理解语法。文档不是特别有用。我没有 Java 8。

Here is an example of what I would like to do if I could use a regular forloop.

这是一个示例,说明如果我可以使用常规for循环,我想做什么。

public class PCA {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("PCA Example");
        SparkContext sc = new SparkContext(conf);

        RDD<String> data = sc.textFile("my/directory/my/dataset.txt", 0);

        // here is the "type" of code I would like to execute
        // 30 because I have 30 variables
        double[][] vals = new double[data.count()][30];

        double[] temp;
        for (int i = 0; i < data.count(); i++) {
            temp = splitStringtoDoubles(data[i]);
            vals[i] = temp;
        }
    }

    private static double[] splitStringtoDoubles(String s) {
        String[] splitVals = s.split("\t");
        Double[] vals = new Double[splitVals.length];
        for (int i = 0; i < splitVals.length; i++) {
            vals[i] = Double.parseDouble(splitVals[i]);
        }
    }

}

I understand that foreachseems to require a function that has a void return type. Not sure how to work with that. Here is what I have attempted so far (obviously the syntax is wrong):

我知道这foreach似乎需要一个具有 void 返回类型的函数。不知道如何处理。这是我到目前为止所尝试的(显然语法是错误的):

    double[][] matrix = new double[data.count()][30];
    foreach(String s : data) {
        String[] splitvals = s.split("\t");
        double[] vals = Double.parseDouble(splitvals);
        matrix[s] = vals; 
    }

采纳答案by Balduz

As mattinbitssaid in the comments, you want a mapinstead of a foreach, since you want to return values. What a mapdoes basically is to transform your data: for each row of your RDD you perform an operation and return one value for each row. What you need can be achieved like this:

正如mattinbits在评论中所说,您需要 amap而不是 a foreach,因为您想返回值。amap所做的基本上是转换您的数据:对于 RDD 的每一行,您执行一个操作并为每一行返回一个值。你需要的可以这样实现:

import org.apache.spark.api.java.function.Function;

...

SparkConf conf = new SparkConf().setAppName("PCA Example");
SparkContext sc = new SparkContext(conf);

JavaRDD<String> data = sc.textFile("clean-sl-mix-with-labels.txt",0).toJavaRDD();
JavaRDD<double[]> whatYouWantRdd = data.map(new Function<String, double[]>() {
    @Override
    public double[] call(String row) throws Exception {
        return splitStringtoDoubles(row);
    }

    private double[] splitStringtoDoubles(String s) {
        String[] splitVals = s.split("\t");
        Double[] vals = new Double[splitVals.length];
        for(int i=0; i < splitVals.length; i++) {
            vals[i] = Double.parseDouble(splitVals[i]);
        }
        return vals;
    }
});

List<double[]> whatYouWant = whatYouWantRdd.collect();

So that you know how Spark works, you perform actions or transformations on your RDD. For instance, here we are transforming our RDD using a mapfunction. You need to create this function yourself, this time with an anonymous org.apache.spark.api.java.function.Functionwhich forces you to override the method call, where you receive a row of your RDD and return a value.

为了了解 Spark 的工作原理,您可以在 RDD 上执行操作或转换。例如,这里我们使用一个map函数来转换我们的 RDD 。您需要自己创建此函数,这次使用匿名函数org.apache.spark.api.java.function.Function强制您覆盖方法call,在该方法中您会收到一行 RDD 并返回一个值。

回答by mattinbits

Just because it's interesting to compare the verboseness of the Java vs Scala API for Spark, here's a Scala version:

仅仅因为比较 Java 和 Scala API for Spark 的冗长性很有趣,这里有一个 Scala 版本:

import org.apache.spark.{SparkContext, SparkConf}

class example extends App {
  val conf = new SparkConf().setMaster("local").setAppName("Spark example")
  val sc = new SparkContext(conf)

  val inputData = List(
    "1.2\t2.7\t3.8",
    "4.3\t5.1\t6.3"
  )

  val inputRDD = sc.parallelize(inputData)
  val arrayOfDoubleRDD = inputRDD.map(_.split("\t").map(_.toDouble))
}