scala 使用/不使用 Spark SQL 连接两个普通 RDD
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/27437507/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Join two ordinary RDDs with/without Spark SQL
提问by learning_spark
I need to join two ordinary RDDson one/more columns. Logically this operation is equivalent to the database join operation of two tables. I wonder if this is possible only through Spark SQLor there are other ways of doing it.
我需要RDDs在一列/多列上加入两个普通的。这个操作在逻辑上相当于两个表的数据库join操作。我想知道这是否只能通过Spark SQL或有其他方法来实现。
As a concrete example, consider
RDD r1with primary key ITEM_ID:
作为一个具体的例子,考虑r1带有主键的RDD ITEM_ID:
(ITEM_ID, ITEM_NAME, ITEM_UNIT, COMPANY_ID)
and RDD r2with primary key COMPANY_ID:
和r2带有主键的RDD COMPANY_ID:
(COMPANY_ID, COMPANY_NAME, COMPANY_CITY)
I want to join r1and r2.
我想加入r1和r2。
How can this be done?
如何才能做到这一点?
回答by viirya
Soumya Simanta gave a good answer. However, the values in joined RDD are Iterable, so the results may not be very similar to ordinary table joining.
Soumya Simanta 给出了很好的答案。但是,joined RDD 中的值是Iterable,所以结果可能和普通的 table join 不太相似。
Alternatively, you can:
或者,您可以:
val mappedItems = items.map(item => (item.companyId, item))
val mappedComp = companies.map(comp => (comp.companyId, comp))
mappedItems.join(mappedComp).take(10).foreach(println)
The output would be:
输出将是:
(c1,(Item(1,first,2,c1),Company(c1,company-1,city-1)))
(c1,(Item(2,second,2,c1),Company(c1,company-1,city-1)))
(c2,(Item(3,third,2,c2),Company(c2,company-2,city-2)))
回答by New Coder
(Using Scala) Let say you have two RDDs:
(使用 Scala)假设您有两个 RDD:
emp: (empid, ename, dept)
dept: (dname, dept)
emp: (empid, ename, dept)
部门:(名称,部门)
Following is another way:
以下是另一种方式:
//val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)))
val emp = sc.parallelize(Seq(("jordan",10), ("ricky",20), ("matt",30), ("mince",35), ("rhonda",30)))
val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))
//val shifted_fields_emp = emp.map(t => (t._3, t._1, t._2))
val shifted_fields_emp = emp.map(t => (t._2, t._1))
val shifted_fields_dept = dept.map(t => (t._2,t._1))
shifted_fields_emp.join(shifted_fields_dept)
// Create emp RDD
val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)))
// Create dept RDD
val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))
// Establishing that the third field is to be considered as the Key for the emp RDD
val manipulated_emp = emp.keyBy(t => t._3)
// Establishing that the second field need to be considered as the Key for dept RDD
val manipulated_dept = dept.keyBy(t => t._2)
// Inner Join
val join_data = manipulated_emp.join(manipulated_dept)
// Left Outer Join
val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept)
// Right Outer Join
val right_outer_join_data = manipulated_emp.rightOuterJoin(manipulated_dept)
// Full Outer Join
val full_outer_join_data = manipulated_emp.fullOuterJoin(manipulated_dept)
// Formatting the Joined Data for better understandable (using map)
val cleaned_joined_data = join_data.map(t => (t._2._1._1, t._2._1._2, t._1, t._2._2._1))
This will give the output as:
这将使输出为:
// Print the output cleaned_joined_data on the console
// 在控制台打印输出cleaned_joined_data
scala> cleaned_joined_data.collect()
res13: Array[(Int, String, Int, String)] = Array((3,matt,30,hive), (5,rhonda,30,hive), (2,ricky,20,spark), (1,jordan,10,hadoop))
回答by Soumya Simanta
Something like this should work.
像这样的事情应该有效。
scala> case class Item(id:String, name:String, unit:Int, companyId:String)
scala> case class Company(companyId:String, name:String, city:String)
scala> val i1 = Item("1", "first", 2, "c1")
scala> val i2 = i1.copy(id="2", name="second")
scala> val i3 = i1.copy(id="3", name="third", companyId="c2")
scala> val items = sc.parallelize(List(i1,i2,i3))
items: org.apache.spark.rdd.RDD[Item] = ParallelCollectionRDD[14] at parallelize at <console>:20
scala> val c1 = Company("c1", "company-1", "city-1")
scala> val c2 = Company("c2", "company-2", "city-2")
scala> val companies = sc.parallelize(List(c1,c2))
scala> val groupedItems = items.groupBy( x => x.companyId)
groupedItems: org.apache.spark.rdd.RDD[(String, Iterable[Item])] = ShuffledRDD[16] at groupBy at <console>:22
scala> val groupedComp = companies.groupBy(x => x.companyId)
groupedComp: org.apache.spark.rdd.RDD[(String, Iterable[Company])] = ShuffledRDD[18] at groupBy at <console>:20
scala> groupedItems.join(groupedComp).take(10).foreach(println)
14/12/12 00:52:32 INFO DAGScheduler: Job 5 finished: take at <console>:35, took 0.021870 s
(c1,(CompactBuffer(Item(1,first,2,c1), Item(2,second,2,c1)),CompactBuffer(Company(c1,company-1,city-1))))
(c2,(CompactBuffer(Item(3,third,2,c2)),CompactBuffer(Company(c2,company-2,city-2))))
回答by Vijay Innamuri
Spark SQL can perform join on SPARK RDDs.
Spark SQL 可以在 SPARK RDD 上执行连接。
Below code performs SQL join on Company and Items RDDs
下面的代码对 Company 和 Items RDD 执行 SQL 连接
object SparkSQLJoin {
case class Item(id:String, name:String, unit:Int, companyId:String)
case class Company(companyId:String, name:String, city:String)
def main(args: Array[String]) {
val sparkConf = new SparkConf()
val sc= new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.createSchemaRDD
val i1 = Item("1", "first", 1, "c1")
val i2 = Item("2", "second", 2, "c2")
val i3 = Item("3", "third", 3, "c3")
val c1 = Company("c1", "company-1", "city-1")
val c2 = Company("c2", "company-2", "city-2")
val companies = sc.parallelize(List(c1,c2))
companies.registerAsTable("companies")
val items = sc.parallelize(List(i1,i2,i3))
items.registerAsTable("items")
val result = sqlContext.sql("SELECT * FROM companies C JOIN items I ON C.companyId= I.companyId").collect
result.foreach(println)
}
}
Output is displayed as
输出显示为
[c1,company-1,city-1,1,first,1,c1]
[c2,company-2,city-2,2,second,2,c2]

