SQL 条件复杂的Spark SQL窗口函数

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/42448564/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-01 05:08:31  来源:igfitidea点击:

Spark SQL window function with complex condition

sqlapache-sparkpysparkapache-spark-sqlwindow-functions

提问by user4601931

This is probably easiest to explain through example. Suppose I have a DataFrame of user logins to a website, for instance:

这可能最容易通过示例来解释。假设我有一个用户登录网站的 DataFrame,例如:

scala> df.show(5)
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows

I would like to add to this a column indicating when they became an active user on the site. But there is one caveat: there is a time period during which a user is considered active, and after this period, if they log in again, their became_activedate resets. Suppose this period is 5 days. Then the desired table derived from the above table would be something like this:

我想添加一列,表明他们何时成为网站上的活跃用户。但有一个警告:用户在一段时间内被认为是活跃的,在这段时间之后,如果他们再次登录,他们的became_active日期会重置。假设这个周期是5 天。那么从上表派生的所需表将是这样的:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-11|   2012-01-11|
+----------------+----------+-------------+

So, in particular, SirChillingtonIV's became_activedate was reset because their second login came after the active period expired, but Booooooo99900098's became_activedate was not reset the second time he/she logged in, because it fell within the active period.

所以,特别是,SirChillingtonIV 的became_active日期被重置,因为他们第二次登录是在活动期到期后,但 Booooooo99900098 的became_active日期在他/她第二次登录时没有被重置,因为它落在了活动期内。

My initial thought was to use window functions with lag, and then using the lagged values to fill the became_activecolumn; for instance, something starting roughly like:

我最初的想法是使用带有 的窗口函数lag,然后使用lagged 值来填充became_active列;例如,开头大致如下:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))

Then, the rule to fill in the became_activedate would be, if tmpis null(i.e., if it's the first ever login) or if login_date - tmp >= 5then became_active = login_date; otherwise, go to the next most recent value in tmpand apply the same rule. This suggests a recursive approach, which I'm having trouble imagining a way to implement.

然后,填写became_active日期的规则是,如果tmpnull(即,如果它是第一次登录)或如果login_date - tmp >= 5became_active = login_date;否则,转到下一个最近的值tmp并应用相同的规则。这表明了一种递归方法,我无法想象实现的方法。

My questions: Is this a viable approach, and if so, how can I "go back" and look at earlier values of tmpuntil I find one where I stop? I can't, to my knowledge, iterate through values of a Spark SQL Column. Is there another way to achieve this result?

我的问题:这是一种可行的方法tmp吗?如果是,我如何“返回”并查看 的早期值,直到找到我停下来的地方?据我所知,我无法遍历 Spark SQL 的值Column。有没有其他方法可以达到这个结果?

回答by zero323

Here is the trick. Import a bunch of functions:

这是诀窍。导入一堆函数:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}

Define windows:

定义窗口:

val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")

Find the points where new sessions starts:

找到新会话开始的点:

val newSession =  (coalesce(
  datediff($"login_date", lag($"login_date", 1).over(userWindow)),
  lit(0)
) > 5).cast("bigint")

val sessionized = df.withColumn("session", sum(newSession).over(userWindow))

Find the earliest date per session:

查找每个会话的最早日期:

val result = sessionized
  .withColumn("became_active", min($"login_date").over(userSessionWindow))
  .drop("session")

With dataset defined as:

数据集定义为:

val df = Seq(
  ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
  ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), 
  ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
  ("SirChillingtonIV", "2012-08-11")
).toDF("user_name", "login_date")

The result is:

结果是:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-04|   2012-01-04| <- The first session for user
|SirChillingtonIV|2012-01-11|   2012-01-11| <- The second session for user
|SirChillingtonIV|2012-01-14|   2012-01-11| 
|SirChillingtonIV|2012-08-11|   2012-08-11| <- The third session for user
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
+----------------+----------+-------------+

回答by User12345

Refactoring the other answerto work with Pyspark

重构另一个答案以使用Pyspark

In Pysparkyou can do like below.

Pyspark你可以做如下。

create data frame

create data frame

df = sqlContext.createDataFrame(
[
("SirChillingtonIV", "2012-01-04"), 
("Booooooo99900098", "2012-01-04"), 
("Booooooo99900098", "2012-01-06"), 
("OprahWinfreyJr", "2012-01-10"), 
("SirChillingtonIV", "2012-01-11"), 
("SirChillingtonIV", "2012-01-14"), 
("SirChillingtonIV", "2012-08-11")
], 
("user_name", "login_date"))

The above code creates a data frame like below

上面的代码创建了一个如下所示的数据框

+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
|SirChillingtonIV|2012-01-14|
|SirChillingtonIV|2012-08-11|
+----------------+----------+

Now we want to first find out the difference between login_dateis more than 5days.

现在我们要先找出两者之间的区别login_date是多于5天。

For this do like below.

为此,请执行以下操作。

Necessary imports

必要的进口

from pyspark.sql import functions as f
from pyspark.sql import Window


# defining window partitions  
login_window = Window.partitionBy("user_name").orderBy("login_date")
session_window = Window.partitionBy("user_name", "session")

session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))

When we run the above line of code if the date_diffis NULLthen the coalescefunction will replace NULLto 0.

当我们运行上面的代码行时,如果date_diff是,NULLcoalesce函数将替换NULL0

+----------------+----------+-------+
|       user_name|login_date|session|
+----------------+----------+-------+
|  OprahWinfreyJr|2012-01-10|      0|
|SirChillingtonIV|2012-01-04|      0|
|SirChillingtonIV|2012-01-11|      1|
|SirChillingtonIV|2012-01-14|      1|
|SirChillingtonIV|2012-08-11|      2|
|Booooooo99900098|2012-01-04|      0|
|Booooooo99900098|2012-01-06|      0|
+----------------+----------+-------+


# add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-04|   2012-01-04|
|SirChillingtonIV|2012-01-11|   2012-01-11|
|SirChillingtonIV|2012-01-14|   2012-01-11|
|SirChillingtonIV|2012-08-11|   2012-08-11|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
+----------------+----------+-------------+