Apache Spark 加入 Java 示例

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/28338694/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 06:03:01  来源:igfitidea点击:

Apache Spark Joins example with Java

javajoinapache-sparkoptional

提问by Shekar Patel

I am very new to Apache Spark. I would actually like to focus on basic Spark API specification and want to understand and write some programs using Spark API. I have written a java program using Apache Spark to implement Joins concept.

我对 Apache Spark 很陌生。我实际上想专注于基本的 Spark API 规范,并想了解和使用 Spark API 编写一些程序。我已经使用 Apache Spark 编写了一个 java 程序来实现 Joins 概念。

When I use Left Outer Join -- leftOuterJoin() or Right Outer Join -- rightOuterJoin(), both two methods are returning a JavaPairRDD which contains a special type Google Options. But I do not know how to extract the original values from Optional type.

当我使用 Left Outer Join -- leftOuterJoin() 或 Right Outer Join -- rightOuterJoin() 时,这两种方法都返回一个 JavaPairRDD,其中包含一个特殊类型的 Google Options。但我不知道如何从 Optional 类型中提取原始值。

Anyways I would like to know can I use same join methods which return the data in my own format. I did not find any way to do that. Meaning is when I am using Apache Spark, I am not able to customize the code in my own style since they already have given all pre-defined things.

无论如何,我想知道我是否可以使用以我自己的格式返回数据的相同连接方法。我没有找到任何方法来做到这一点。意思是当我使用 Apache Spark 时,我无法以我自己的风格自定义代码,因为他们已经给出了所有预定义的东西。

Please find the code below

请找到下面的代码

my 2 sample input datasets

customers_data.txt:
4000001,Kristina,Chung,55,Pilot
4000002,Paige,Chen,74,Teacher
4000003,Sherri,Melton,34,Firefighter

and

trasaction_data.txt
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit

Here is my Java code

这是我的 Java 代码

**SparkJoins.java:**

public class SparkJoins {

    @SuppressWarnings("serial")
    public static void main(String[] args) throws FileNotFoundException {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local"));
        JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt");
        JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String s) {
                String[] customerSplit = s.split(",");
                return new Tuple2<String, String>(customerSplit[0], customerSplit[1]);
            }
        }).distinct();

        JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt");
        JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String s) {
                String[] transactionSplit = s.split(",");
                return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]);
            }
        });

        //Default Join operation (Inner join)
        JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs);
        System.out.println("Joins function Output: "+joinsOutput.collect());

        //Left Outer join operation
        JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey();
        System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect());

        //Right Outer join operation
        JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey();
        System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect());

        sc.close();
    }
}

And here the output which I am getting

这里是我得到的输出

Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))]

LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])]

RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])]

I am running this program on Windows platform

我在 Windows 平台上运行这个程序

Please observe the above output and help me in extracting the values from Optional type

请观察上面的输出并帮助我从 Optional 类型中提取值

Thanks in advance

提前致谢

采纳答案by sms_1190

When you do left outer join and right outer join, you might have null values. right!

当您执行左外连接和右外连接时,您可能会有空值。对!

So spark returns Optional object. after getting that result, you can map that result to your own format.

所以 spark 返回 Optional 对象。获得该结果后,您可以将该结果映射到您自己的格式。

your can use isPresent() method of Optional to map your data.

您可以使用 Optional 的 isPresent() 方法来映射您的数据。

Here is the example :

这是示例:

 JavaPairRDD<String,String> firstRDD = ....
 JavaPairRDD<String,String> secondRDD =....
 // join both rdd using left outerjoin
 JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> rddWithJoin = firstRDD.leftOuterJoin(secondRDD);


// mapping of join result
JavaPairRDD<String, String> mappedRDD = rddWithJoin
            .mapToPair(tuple -> {
                if (tuple._2()._2().isPresent()) {
                    //do your operation and return
                    return new Tuple2<String, String>(tuple._1(), tuple._2()._1());
                } else {
                    return new Tuple2<String, String>(tuple._1(), "not present");
                }
            });

回答by RPaul

In Java, we can also implement JOINs using DataFrames as follows:

在 Java 中,我们还可以使用 DataFrames 实现 JOIN,如下所示:

1) create spark session as:

1)创建火花会话:

SparkSession spark = SparkSession.builder().appName("JoinsInSpark").master("local").getOrCreate();

2) I've taken the Employee input as:

2)我已将员工输入视为:

101,Alan,Franklyn Street,Melbourne,QLD

昆士兰州墨尔本富兰克林街 101, Alan

104,Stuart,Lonsdale Street,Sydney,NSW

104,Stuart,Lonsdale Street,悉尼,新南威尔士州

create DataFrame as:

创建 DataFrame 为:

Dataset<Employee> e_data = spark
                        .read()
                        .textFile("C:/XX/XX/test.txt")
                        .map(line -> {
                            Employee e = new Employee();
                            String[] parts = line.split(",");
                            e.setE_id(Integer.valueOf(parts[0].trim()));
                            e.setE_name(parts[1].trim());
                            e.setAddress(parts[2].trim());
                            e.setCity(parts[3].trim());
                            e.setState(parts[4].trim());
                            return e;
                        }, Encoders.bean(Employee.class));

where Employee is the POJO class containing setter, getter along with constructor.

其中 Employee 是包含 setter、getter 和构造函数的 POJO 类。

3) similarly create another DF for second table (say salary)

3)同样为第二个表创建另一个DF(比如薪水)

4) Apply INNER join on distinct elements of both views:

4) 对两个视图的不同元素应用 INNER 连接:

Dataset<Row> d1 = e_data.distinct().join(s_data.distinct(), "e_id").orderBy("salary");

d1.show();

5) similary, left outer join as:

5)类似的,左外连接为:

spark.sql("select * from global_temp.employee e LEFT OUTER JOIN global_temp.salary s on e.e_id = s.e_id").show();