scala 如何在spark中使用窗口函数过滤数据

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/38872592/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:33:00  来源:igfitidea点击:

How to filter data using window functions in spark

scalaapache-sparkspark-dataframewindow-functions

提问by hbabbar

I have the following data :

我有以下数据:

rowid uid time code
   1  1      5    a
   2  1      6    b
   3  1      7    c
   4  2      8    a
   5  2      9    c
   6  2      9    c
   7  2     10    c
   8  2     11    a
   9  2     12    c

Now I wanted to filter the data in such a way that I can remove the rows 6 and 7 as for a particular uid i want to keep just one row with value 'c' in code

现在我想以这样的方式过滤数据,我可以删除第 6 行和第 7 行,因为对于特定的 uid 我只想在代码中保留一个值为“c”的行

So the expected data should be :

所以预期的数据应该是:

rowid uid time code
   1  1      5    a
   2  1      6    b
   3  1      7    c
   4  2      8    a
   5  2      9    c
   8  2     11    a
   9  2     12    c

I'm using window function something like this :

我正在使用这样的窗口函数:

val window = Window.partitionBy("uid").orderBy("time")
val change = ((lag("code", 1).over(window) <=> "c")).cast("int")

This would help us identify each row with a code 'c'. Can i extend this to filter out rows to get the expected data

这将帮助我们用代码“c”识别每一行。我可以扩展它以过滤掉行以获得预期的数据吗

回答by Daniel de Paula

If you want to remove only the lines where code = "c" (except the first one for each uid) you could try the following:

如果您只想删除 code = "c" 的行(每个 uid 的第一个除外),您可以尝试以下操作:

val window = Window.partitionBy("uid", "code").orderBy("time")
val result = df
  .withColumn("rank", row_number().over(window))
  .where(
    (col("code") !== "c") ||
    col("rank") === 1
  )
  .drop("rank")

Edit based on new information:

根据新信息进行编辑:

val window = Window.partitionBy("uid").orderBy("time")
val result = df
  .withColumn("lagValue", coalesce(lag(col("code"), 1).over(window), lit("")))
  .where(
    (col("code") !== "c") ||
    (col("lagValue") !== "c")
  )
  .drop("lagValue")