scala spark sql窗口函数滞后

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41158115/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:55:23  来源:igfitidea点击:

spark sql window function lag

scalaapache-sparkapache-spark-sqlwindow-functions

提问by Ramesh

I am looking at the window slide function for a Spark DataFrame in Spark SQL, Scala.

我正在查看 Spark SQL, Scala 中 Spark DataFrame 的窗口滑动功能。

I have a dataframe with columns Col1,Col2,Col3,date,volume and new_col.

我有一个包含 Col1、Col2、Col3、date、volume 和 new_col 列的数据框。

Col1    Col2    Col3    date     volume new_col
                        201601  100.5   
                        201602  120.6   100.5
                        201603  450.2   120.6
                        201604  200.7   450.2
                        201605  121.4   200.7`

Now I want to add a new column with name(new_col) with one row slided down, as shown above.

现在我想添加一个名为(new_col) 的新列,其中一行向下滑动,如上所示。

I tried below option to use the window function.

我尝试了以下选项来使用窗口函数。

val windSldBrdrxNrx_df = df.withColumn("Prev_brand_rx", lag("Prev_brand_rx",1))

Can anyone please help me to how to do this.

任何人都可以帮助我如何做到这一点。

回答by mrsrinivas

You are doing correctly all you missed is over(window expression)on lag

你正在做正确的你错过的是over(window expression)lag

val df = sc.parallelize(Seq((201601, 100.5),
  (201602, 120.6),
  (201603, 450.2),
  (201604, 200.7),
  (201605, 121.4))).toDF("date", "volume")

val w = org.apache.spark.sql.expressions.Window.orderBy("date")  

import org.apache.spark.sql.functions.lag

val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w))

leadDf.show()

+------+------+-------+
|  date|volume|new_col|
+------+------+-------+
|201601| 100.5|    0.0|
|201602| 120.6|  100.5|
|201603| 450.2|  120.6|
|201604| 200.7|  450.2|
|201605| 121.4|  200.7|
+------+------+-------+

This code was run on Spark shell 2.0.2

此代码在 Spark shell 2.0.2 上运行

回答by Sampat Kumar

You can import below two packages, which will resolve the issue of lag dependencies.

您可以导入以下两个包,这将解决滞后依赖的问题。

import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window