使用多个条件加入多个数据帧 Spark Scala

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41999385/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:03:53  来源:igfitidea点击:

Joining Multiple DataFrames using Multiple Conditions Spark Scala

scalaapache-spark

提问by Anji

I'm converting present Sql Querys to DataFrames using Spark-scala I had Query where I had Multiple Inner Joins to be performed.Actually I can Implement in SqlContext.sql("") but my team is not interested in sqlContext want to perform operations on top of data frames

我正在使用 Spark-scala 将当前的 Sql 查询转换为 DataFrames 我有一个查询,我有多个内部连接要执行。实际上我可以在 SqlContext.sql("") 中实现,但我的团队对 sqlContext 不感兴趣想要执行操作在数据框之上

si s inner join 
ac a on s.cid = a.cid and s.sid =a.sid
inner join De d on s.cid = d.cid AND d.aid = a.aid 
inner join SGrM sgm on s.cid = sgm.cid and s.sid =sgm.sid and sgm.status=1
inner join SiGo sg on sgm.cid =sg.cid and sgm.gid =sg.gid 
inner join bg bu on s.cid = bu.cid and s.sid =bu.sid
inner join ls al on a.AtLId = al.lid
inner join ls rl on a.RtLId = rl.lid
inner join ls vl on a.VLId = vl.lid

From My searching I got to know we can recursively join using

从我的搜索中我知道我们可以递归地加入使用

List(df1,df2,df3,dfN).reduce((a, b) => a.join(b, joinCondition))

But I cant satisfy above condition since there are multiple Conditions involved How can I perform this?

但是我无法满足上述条件,因为涉及多个条件我该如何执行?

回答by Gowtham

You can join multiple dataframes with multiple conditions like below:

您可以使用多种条件加入多个数据框,如下所示:

val result = df1.as("df1").join(df2.as("df2"), 
              $"df1.col1"===$df2.col1" && $"df1.col2"===$df2.col2").join(df3.as("df3"), 
              $"df3.col1"===$df2.col1" && $"df3.col2"===$df2.col2", "left_outer")

回答by Karol Sudol

First of all, replace DataFrames with DataSet and Spark 2.+ to enable better performance by avoiding JVM objects - re project Tungsten.

首先,用 DataSet 和 Spark 2.+ 替换 DataFrames,通过避免 JVM 对象来实现更好的性能 - 重新项目 Tungsten。

Now, to your question: Lets say you have 4 x DS as:

现在,对于您的问题:假设您有 4 个 DS:

First create schema for your tables:

首先为您的表创建架构:

case class DS (id: Int, colA: String)

case class DS (id: Int, colA: String)

Then read files with optimisation enabled:

然后读取启用优化的文件:

 val ds1 = spark.read.parquet("X1").as[DS]

 val ds2 = spark.read.parquet("X2").as[DS]

 val ds3 = spark.read.parquet("X3").as[DS]

 val ds4 = spark.read.parquet("X4").as[DS]

Now, you can join them one by one so that you can follow the data flow (only use broadcast when you have small table):

现在,您可以将它们一一加入,以便您可以跟踪数据流(只有在您有小表时才使用广播):

case class JoinedDS (colB: String)


val joinedDS = ds1.join(broadcast(ds2), Seq("id"), "inner")
.join(ds3, Seq("id", "colB"), "inner")
.join(ds4, Seq("id"), "inner")
.select(col("colB") 
.as[JoinedDS]

回答by user3123372

Below is an example of joining six tables/dataframes (not using SQL)

下面是加入六个表/数据框的示例(不使用 SQL)

retail_db is a well known sample DB, anyone can get it from Google

Retail_db 是一个众所周知的示例数据库,任何人都可以从 Google 获取它

Problem: //Get all customers from TX who bought fitness items

问题://获取所有购买健身用品的TX客户

 val df_customers = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "customers").option("user", "root").option("password", "root").load()
  val df_products = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "products").option("user", "root").option("password", "root").load() 
  val df_orders = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "orders"). option("user", "root").option("password", "root").load()
  val df_order_items = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "order_items").option("user", "root").option("password", "root").load()
  val df_categories = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "categories").option("user", "root").option("password", "root").load()
  val df_departments = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "departments").option("user", "root").option("password", "root").load()
  val df_order_items_all = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "order_all").option("user", "root").option("password", "root").load()



  val jeCustOrd=df_customers.col("customer_id")===df_orders.col("order_customer_id")
  val jeOrdItem=df_orders.col("order_id")===df_order_items.col("order_item_order_id")
  val jeProdOrdItem=df_products.col("product_id")===df_order_items.col("order_item_product_id")
  val jeProdCat=df_products.col("product_category_id")===df_categories.col("category_id")
  val jeCatDept=df_categories.col("category_department_id")===df_departments.col("department_id")
  // val jeOrdItem=df_orders.col("")===df_order_items.col("")



  //Get all customers from TX who bought fitness items
  df_customers.where("customer_state = 'TX'").join(df_orders,jeCustOrd).join(df_order_items,jeOrdItem).join(df_products,jeProdOrdItem).join(df_categories,jeProdCat).join(df_departments,jeCatDept).filter("department_name='Fitness'")
  .select("customer_id","customer_fname","customer_lname", "customer_street","customer_city","customer_state","customer_zipcode","order_id","category_name","department_name").show(5)