Apache Spark 加入 Java 示例
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/28338694/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Apache Spark Joins example with Java
提问by Shekar Patel
I am very new to Apache Spark. I would actually like to focus on basic Spark API specification and want to understand and write some programs using Spark API. I have written a java program using Apache Spark to implement Joins concept.
我对 Apache Spark 很陌生。我实际上想专注于基本的 Spark API 规范,并想了解和使用 Spark API 编写一些程序。我已经使用 Apache Spark 编写了一个 java 程序来实现 Joins 概念。
When I use Left Outer Join -- leftOuterJoin() or Right Outer Join -- rightOuterJoin(), both two methods are returning a JavaPairRDD which contains a special type Google Options. But I do not know how to extract the original values from Optional type.
当我使用 Left Outer Join -- leftOuterJoin() 或 Right Outer Join -- rightOuterJoin() 时,这两种方法都返回一个 JavaPairRDD,其中包含一个特殊类型的 Google Options。但我不知道如何从 Optional 类型中提取原始值。
Anyways I would like to know can I use same join methods which return the data in my own format. I did not find any way to do that. Meaning is when I am using Apache Spark, I am not able to customize the code in my own style since they already have given all pre-defined things.
无论如何,我想知道我是否可以使用以我自己的格式返回数据的相同连接方法。我没有找到任何方法来做到这一点。意思是当我使用 Apache Spark 时,我无法以我自己的风格自定义代码,因为他们已经给出了所有预定义的东西。
Please find the code below
请找到下面的代码
my 2 sample input datasets
customers_data.txt:
4000001,Kristina,Chung,55,Pilot
4000002,Paige,Chen,74,Teacher
4000003,Sherri,Melton,34,Firefighter
and
trasaction_data.txt
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit
Here is my Java code
这是我的 Java 代码
**SparkJoins.java:**
public class SparkJoins {
@SuppressWarnings("serial")
public static void main(String[] args) throws FileNotFoundException {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local"));
JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt");
JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) {
String[] customerSplit = s.split(",");
return new Tuple2<String, String>(customerSplit[0], customerSplit[1]);
}
}).distinct();
JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt");
JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) {
String[] transactionSplit = s.split(",");
return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]);
}
});
//Default Join operation (Inner join)
JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs);
System.out.println("Joins function Output: "+joinsOutput.collect());
//Left Outer join operation
JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey();
System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect());
//Right Outer join operation
JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey();
System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect());
sc.close();
}
}
And here the output which I am getting
这里是我得到的输出
Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))]
LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])]
RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])]
I am running this program on Windows platform
我在 Windows 平台上运行这个程序
Please observe the above output and help me in extracting the values from Optional type
请观察上面的输出并帮助我从 Optional 类型中提取值
Thanks in advance
提前致谢
采纳答案by sms_1190
When you do left outer join and right outer join, you might have null values. right!
当您执行左外连接和右外连接时,您可能会有空值。对!
So spark returns Optional object. after getting that result, you can map that result to your own format.
所以 spark 返回 Optional 对象。获得该结果后,您可以将该结果映射到您自己的格式。
your can use isPresent() method of Optional to map your data.
您可以使用 Optional 的 isPresent() 方法来映射您的数据。
Here is the example :
这是示例:
JavaPairRDD<String,String> firstRDD = ....
JavaPairRDD<String,String> secondRDD =....
// join both rdd using left outerjoin
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> rddWithJoin = firstRDD.leftOuterJoin(secondRDD);
// mapping of join result
JavaPairRDD<String, String> mappedRDD = rddWithJoin
.mapToPair(tuple -> {
if (tuple._2()._2().isPresent()) {
//do your operation and return
return new Tuple2<String, String>(tuple._1(), tuple._2()._1());
} else {
return new Tuple2<String, String>(tuple._1(), "not present");
}
});
回答by RPaul
In Java, we can also implement JOINs using DataFrames as follows:
在 Java 中,我们还可以使用 DataFrames 实现 JOIN,如下所示:
1) create spark session as:
1)创建火花会话:
SparkSession spark = SparkSession.builder().appName("JoinsInSpark").master("local").getOrCreate();
2) I've taken the Employee input as:
2)我已将员工输入视为:
101,Alan,Franklyn Street,Melbourne,QLD
昆士兰州墨尔本富兰克林街 101, Alan
104,Stuart,Lonsdale Street,Sydney,NSW
104,Stuart,Lonsdale Street,悉尼,新南威尔士州
create DataFrame as:
创建 DataFrame 为:
Dataset<Employee> e_data = spark
.read()
.textFile("C:/XX/XX/test.txt")
.map(line -> {
Employee e = new Employee();
String[] parts = line.split(",");
e.setE_id(Integer.valueOf(parts[0].trim()));
e.setE_name(parts[1].trim());
e.setAddress(parts[2].trim());
e.setCity(parts[3].trim());
e.setState(parts[4].trim());
return e;
}, Encoders.bean(Employee.class));
where Employee is the POJO class containing setter, getter along with constructor.
其中 Employee 是包含 setter、getter 和构造函数的 POJO 类。
3) similarly create another DF for second table (say salary)
3)同样为第二个表创建另一个DF(比如薪水)
4) Apply INNER join on distinct elements of both views:
4) 对两个视图的不同元素应用 INNER 连接:
Dataset<Row> d1 = e_data.distinct().join(s_data.distinct(), "e_id").orderBy("salary");
d1.show();
5) similary, left outer join as:
5)类似的,左外连接为:
spark.sql("select * from global_temp.employee e LEFT OUTER JOIN global_temp.salary s on e.e_id = s.e_id").show();