scala 如何根据另一个数据帧的值(主键)计算火花数据帧中的行数?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/46032127/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:25:16  来源:igfitidea点击:

How to count number of rows in a spark dataframe based on a value (primary key) from another dataframe?

scalaapache-sparkspark-dataframe

提问by Kirupa

I have two dataframes df1 and df2. Both have a column 'date' as shown below.

我有两个数据框 df1 和 df2。两者都有一个“日期”列,如下所示。

Structure of df1

df1的结构

+----------+
|      date|
+----------+
|02-01-2015|
|02-02-2015|
|02-03-2015|
+----------+

Structure of df2

df2的结构

+---+-------+-----+----------+
| ID|feature|value|      date|
+---+-------+-----+----------+
|  1|balance|  100|01-01-2015|
|  1|balance|  100|05-01-2015|
|  1|balance|  100|30-01-2015|
|  1|balance|  100|01-02-2015|
|  1|balance|  100|01-03-2015|
+---+-------+-----+----------+

I have to take each row in 'date' column from df1, compare with df2 'date' and get all rows from df2 that are less than the date in df1.

我必须从df1中获取'date'列中的每一行,与df2'date'进行比较并从df2中获取小于df1中日期的所有行。

Say take first row 02-01-2015 from df1 and get all rows that are less than 02-01-2015 from df2 which should produce an output as follows

假设从 df1 获取第一行 02-01-2015 并从 df2 获取小于 02-01-2015 的所有行,这应该产生如下输出

+---+-------+-----+----------+
| ID|feature|value|      date|
+---+-------+-----+----------+
|  1|balance|  100|01-01-2015|
+---+-------+-----+----------+ 

What is the best way to achieve this in spark-scala ? I have hundreds of millions of rows. I thought of using window function in spark but window is limitied to one dataframe.

在 spark-scala 中实现这一目标的最佳方法是什么?我有数亿行。我想在 spark 中使用窗口函数,但窗口仅限于一个数据帧。

回答by Raphael Roth

this gets you all results in a new dataframe:

这会让你在一个新的数据框中获得所有结果:

val df1 = Seq(
  "02-01-2015",
  "02-02-2015",
  "02-03-2015"
).toDF("date")
  .withColumn("date", from_unixtime(unix_timestamp($"date", "dd-MM-yyyy")))

val df2 = Seq(
  (1, "balance", 100, "01-01-2015"),
  (1, "balance", 100, "05-01-2015"),
  (1, "balance", 100, "30-01-2015"),
  (1, "balance", 100, "01-02-2015"),
  (1, "balance", 100, "01-03-2015")
).toDF("ID", "feature", "value", "date")
  .withColumn("date", from_unixtime(unix_timestamp($"date", "dd-MM-yyyy")))

df1.join(
  df2, df2("date") < df1("date"), "left"
).show()


+-------------------+---+-------+-----+-------------------+
|               date| ID|feature|value|               date|
+-------------------+---+-------+-----+-------------------+
|2015-01-02 00:00:00|  1|balance|  100|2015-01-01 00:00:00|
|2015-02-02 00:00:00|  1|balance|  100|2015-01-01 00:00:00|
|2015-02-02 00:00:00|  1|balance|  100|2015-01-05 00:00:00|
|2015-02-02 00:00:00|  1|balance|  100|2015-01-30 00:00:00|
|2015-02-02 00:00:00|  1|balance|  100|2015-02-01 00:00:00|
|2015-03-02 00:00:00|  1|balance|  100|2015-01-01 00:00:00|
|2015-03-02 00:00:00|  1|balance|  100|2015-01-05 00:00:00|
|2015-03-02 00:00:00|  1|balance|  100|2015-01-30 00:00:00|
|2015-03-02 00:00:00|  1|balance|  100|2015-02-01 00:00:00|
|2015-03-02 00:00:00|  1|balance|  100|2015-03-01 00:00:00|
+-------------------+---+-------+-----+-------------------+

EDIT: to get the number of matchign records from df2, do :

编辑:要从 df2 获取 matchign 记录的数量,请执行以下操作:

 df1.join(
    df2, df2("date") < df1("date"), "left"
 )
 .groupBy(df1("date"))
 .count
 .orderBy(df1("date"))
 .show

+-------------------+-----+
|               date|count|
+-------------------+-----+
|2015-01-02 00:00:00|    1|
|2015-02-02 00:00:00|    4|
|2015-03-02 00:00:00|    5|
+-------------------+-----+

回答by Ramesh Maharjan

If you are looking to compare only one row of df1with df2datethen you should first selectthe intended row from df1

如果您正在寻找比较只有一排df1df2date,那么你应该首先select从意行df1

val oneRowDF1 = df1.select($"date".as("date2")).where($"date" === "02-01-2015")

then you should joinwith the logic you have as

那么你应该join按照你的逻辑

df2.join(oneRowDF1, unix_timestamp(df2("date"), "dd-MM-yyyy") < unix_timestamp(oneRowDF1("date2"), "dd-MM-yyyy"))
    .drop("date2")

which should give you

这应该给你

+---+-------+-----+----------+
|ID |feature|value|date      |
+---+-------+-----+----------+
|1  |balance|100  |01-01-2015|
+---+-------+-----+----------+

Updated

更新

Joins are expensive as it requires shuffling of data between executors of different nodes.

连接很昂贵,因为它需要在不同节点的执行程序之间混洗数据。

You can simply use filter function as below

您可以简单地使用过滤功能如下

val oneRowDF1 = df1.select(unix_timestamp($"date", "dd-MM-yyyy").as("date2")).where($"date" === "02-01-2015")

df2.filter(unix_timestamp($"date", "dd-MM-yyyy") < oneRowDF1.take(1)(0)(0))

I hope the answer is helpful

我希望答案有帮助