scala 在 spark 中对多个 DataFrame 执行连接

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39976328/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:43:44  来源:igfitidea点击:

perform join on multiple DataFrame in spark

scalajoinapache-spark

提问by Neha

I have 3dataframes generated from 3 different processes. Every dataframe is having columns of same name. My dataframe looks like this

我有 3 个由 3 个不同进程生成的数据帧。每个数据框都有同名的列。我的数据框看起来像这样

id   val1    val2       val3    val4
 1    null   null       null    null
 2    A2      A21       A31      A41

id   val1      val2       val3      val4
 1    B1        B21        B31       B41
 2    null      null       null      null

id   val1     val2       val3    val4
 1    C1       C2        C3       C4
 2    C11      C12       C13      C14

Out of these 3 dataframes, i want to create two dataframes, (final and consolidated). For final, order of preferences - dataFrame 1 > Dataframe 2 > Dataframe 3

在这 3 个数据帧中,我想创建两个数据帧(最终和合并)。最后,首选项的顺序 - 数据框 1 > 数据框 2 > 数据框 3

If a result is there in dataframe 1(val1 != null), i will store that row in final dataframe.

如果结果在数据帧 1(val1 != null) 中,我会将该行存储在最终数据帧中。

My final result should be :

我的最终结果应该是:

id  finalVal1    finalVal2   finalVal3   finalVal4 
1     B1           B21         B31         B41
2     A2           A21         A31         A41

Consolidated Dataframe will store results from all 3.

Consolidated Dataframe 将存储所有 3 个的结果。

How can i do that efficiently?

我怎样才能有效地做到这一点?

回答by cheseaux

If I understood you correctly, for each row you want to find out the first non-null values, first by looking into the first table, then the second table, then the third table.

如果我理解正确,对于每一行,您要找出第一个非空值,首先查看第一个表,然后是第二个表,然后是第三个表。

You simply need to join these three tables based on the idand then use the coalescefunction to get the first non-null element

您只需要根据 连接这三个表id,然后使用该coalesce函数获取第一个非空元素

import org.apache.spark.sql.functions._

val df1 = sc.parallelize(Seq(
    (1,null,null,null,null),
    (2,"A2","A21","A31", "A41"))
  ).toDF("id", "val1", "val2", "val3", "val4")

val df2 = sc.parallelize(Seq(
    (1,"B1","B21","B31", "B41"),
    (2,null,null,null,null))
  ).toDF("id", "val1", "val2", "val3", "val4")

val df3 = sc.parallelize(Seq(
    (1,"C1","C2","C3","C4"),
    (2,"C11","C12","C13", "C14"))
  ).toDF("id", "val1", "val2", "val3", "val4")

val consolidated = df1.join(df2, "id").join(df3, "id").select(
  df1("id"),
  coalesce(df1("val1"), df2("val1"), df3("val1")).as("finalVal1"),
  coalesce(df1("val2"), df2("val2"), df3("val2")).as("finalVal2"),
  coalesce(df1("val3"), df2("val3"), df3("val3")).as("finalVal3"),
  coalesce(df1("val4"), df2("val4"), df3("val4")).as("finalVal4")
)

Which gives you the expected output

这为您提供了预期的输出

+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
|  1|  B1| B21| B31| B41|
|  2|  A2| A21| A31| A41|
+---+----+----+----+----+

回答by user3123372

Below is an example of joining six tables/dataframes (not using SQL)

下面是加入六个表/数据框的示例(不使用 SQL)

retail_db is a well known sample DB, anyone can get it from Google

Retail_db 是一个众所周知的示例数据库,任何人都可以从 Google 获取它

Problem: //Get all customers from TX who bought fitness items

问题://获取所有购买健身用品的TX客户

 val df_customers = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "customers").option("user", "root").option("password", "root").load()
  val df_products = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "products").option("user", "root").option("password", "root").load() 
  val df_orders = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "orders"). option("user", "root").option("password", "root").load()
  val df_order_items = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "order_items").option("user", "root").option("password", "root").load()
  val df_categories = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "categories").option("user", "root").option("password", "root").load()
  val df_departments = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "departments").option("user", "root").option("password", "root").load()
  val df_order_items_all = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "order_all").option("user", "root").option("password", "root").load()



  val jeCustOrd=df_customers.col("customer_id")===df_orders.col("order_customer_id")
  val jeOrdItem=df_orders.col("order_id")===df_order_items.col("order_item_order_id")
  val jeProdOrdItem=df_products.col("product_id")===df_order_items.col("order_item_product_id")
  val jeProdCat=df_products.col("product_category_id")===df_categories.col("category_id")
  val jeCatDept=df_categories.col("category_department_id")===df_departments.col("department_id")





  df_customers.where("customer_state = 'TX'").join(df_orders,jeCustOrd).join(df_order_items,jeOrdItem).join(df_products,jeProdOrdItem).join(df_categories,jeProdCat).join(df_departments,jeCatDept).filter("department_name='Fitness'")
  .select("customer_id","customer_fname","customer_lname", "customer_street","customer_city","customer_state","customer_zipcode","order_id","category_name","department_name").show(5)

回答by Wilmerton

Edit: New solution with partially null lines. It avoids joins, but uses a window function and a distinct...

编辑:带有部分空行的新解决方案。它避免了连接,但使用了一个窗口函数和一个不同的...

case class a(id:Int,val1:String,val2:String,val3:String,val4:String)

val df1 = sc.parallelize(List(
a(1,null,null,null,null),
a(2,"A2","A21","A31","A41"),
a(3,null,null,null,null))).toDF()

val df2 = sc.parallelize(List(
a(1,"B1",null,"B31","B41"),
a(2,null,null,null,null),
a(3,null,null,null,null))).toDF()

val df3 = sc.parallelize(List(
a(1,"C1","C2","C3","C4"),
a(2,"C11","C12","C13","C14"),
a(3,"C11","C12","C13","C14"))).toDF()

val anyNotNull = df1.columns.tail.map(c => col(c).isNotNull).reduce(_ || _)

val consolidated = {
  df1
    .filter(anyNotNull)
    .withColumn("foo",lit(1))
    .unionAll(df2.filter(anyNotNull).withColumn("foo",lit(2)))
    .unionAll(df3.filter(anyNotNull).withColumn("foo",lit(3)))
}

scala> finalDF.show()
+---+----+----+----+----+
| id|val1|val2|val3|val4|                                                                                                                                                                                                                                                    
+---+----+----+----+----+
|  1|  B1|null| B31| B41|
|  1|  B1|  C2| B31| B41|
|  3| C11| C12| C13| C14|
|  2|  A2| A21| A31| A41|
|  2|  A2| A21| A31| A41|
+---+----+----+----+----+

val w = Window.partitionBy('id).orderBy('foo)

val coalesced = col("id") +: df1.columns.tail.map(c => first(col(c),true).over(w).as(c))
val finalDF = consolidated.select(coalesced:_*).na.drop.distinct

scala> finalDF.show()
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
|  1|  B1|  C2| B31| B41|
|  3| C11| C12| C13| C14|
|  2|  A2| A21| A31| A41|
+---+----+----+----+----+


Old solution:

旧解决方案:

If you have only full lines of nullor no null at all, you can do this (edit: the advantage over the other solution is that you avoid the distinct)

如果您只有整行null或根本没有空,您可以这样做(编辑:与其他解决方案相比的优势在于您避免了不同的)

data:

数据:

case class a(id:Int,val1:String,val2:String,val3:String,val4:String)

val df1 = sc.parallelize(List(
a(1,null,null,null,null),
a(2,"A2","A21","A31","A41"),
a(3,null,null,null,null))).toDF()
val df2 = sc.parallelize(List(
a(1,"B1","B21","B31","B41"),
a(2,null,null,null,null),
a(3,null,null,null,null))).toDF()
val df3 = sc.parallelize(List(
a(1,"C1","C2","C3","C4"),
a(2,"C11","C12","C13","C14"),
a(3,"C11","C12","C13","C14"))).toDF()

consolidated:

综合:

val consolidated = {
  df1.na.drop.withColumn("foo",lit(1))
  .unionAll(df2.na.drop.withColumn("foo",lit(2)))
  .unionAll(df3.na.drop.withColumn("foo",lit(3)))
}

scala> consolidated.show()
+---+----+----+----+----+---+
| id|val1|val2|val3|val4|foo|
+---+----+----+----+----+---+
|  2|  A2| A21| A31| A41|  1|
|  1|  B1| B21| B31| B41|  2|
|  1|  C1|  C2|  C3|  C4|  3|
|  2| C11| C12| C13| C14|  3|
|  3| C11| C12| C13| C14|  3|
+---+----+----+----+----+---+

Final

最后

val w = Window.partitionBy('id).orderBy('foo)
val finalDF = consolidated
  .withColumn("foo2",rank().over(w))
  .filter('foo2===1)
  .drop("foo").drop("foo2")

scala> finalDF.show()
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
|  1|  B1| B21| B31| B41|
|  3| C11| C12| C13| C14|
|  2|  A2| A21| A31| A41|
+---+----+----+----+----+

回答by ASe

If they are from three different tabels, I would use push down filters to filter them on server and use join between data frame join function to join them together.

如果它们来自三个不同的表,我将使用下推过滤器在服务器上过滤它们并使用数据框连接函数之间的连接将它们连接在一起。

If they are not from database tables; you can use filter and map high order function to the same parallel.

如果它们不是来自数据库表;您可以使用过滤器并将高阶函数映射到相同的并行。