如何在 Spark Java 中遍历/迭代数据集?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/42757499/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-12 00:21:56  来源:igfitidea点击:

How to traverse/iterate a Dataset in Spark Java?

javaapache-sparkiteratorapache-spark-2.0apache-spark-dataset

提问by Abhishek Vk

I am trying to traverse a Dataset to do some string similarity calculations like Jaro winkler or Cosine Similarity. I convert my Dataset to list of rows and then traverse with for statement which is not efficient spark way to do it. So I am looking forward for a better approach in Spark.

我正在尝试遍历数据集以进行一些字符串相似度计算,例如 Jaro winkler 或 Cosine Similarity。我将我的数据集转换为行列表,然后使用 for 语句遍历,这不是高效的火花方式。所以我期待在 Spark 中找到更好的方法。

public class sample {

    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Example").setMaster("local[*]"));
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();

        List<Row> data = Arrays.asList(RowFactory.create("Mysore","Mysuru"),
                RowFactory.create("Name","FirstName"));
        StructType schema = new StructType(
                new StructField[] { new StructField("Word1", DataTypes.StringType, true, Metadata.empty()),
                        new StructField("Word2", DataTypes.StringType, true, Metadata.empty()) });

        Dataset<Row> oldDF = spark.createDataFrame(data, schema);
        oldDF.show();
        List<Row> rowslist = oldDF.collectAsList(); 
    }
}

I have found many JavaRDD examples which I am not clear. An Example for Dataset will help me a lot.

我发现了许多我不清楚的 JavaRDD 示例。数据集示例对我有很大帮助。

采纳答案by abaghel

You can use org.apache.spark.api.java.function.ForeachFunctionlike below.

你可以org.apache.spark.api.java.function.ForeachFunction像下面这样使用。

oldDF.foreach((ForeachFunction<Row>) row -> System.out.println(row));

回答by DigitalFox

For old java jdks that don't support lambda expressions, you can use the following after importing:

对于不支持 lambda 表达式的旧 java jdks,导入后可以使用如下:

import org.apache.spark.api.java.function.VoidFunction;

导入 org.apache.spark.api.java.function.VoidFunction;

yourDataSet.toJavaRDD().foreach(new VoidFunction<Row>() {
        public void call(Row r) throws Exception {
            System.out.println(r.getAs("your column name here"));
        }
    });