scala 为 Delta Data 更新 Spark Dataframe 的窗口函数 row_number 列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/46584773/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:28:48  来源:igfitidea点击:

Update Spark Dataframe's window function row_number column for Delta Data

scalaapache-sparkapache-spark-sql

提问by RaAm

I need to update the dataframes's row number column for the delta data. I have implemented the base load's row number as below:

我需要为增量数据更新数据帧的行号列。我已经实现了基本负载的行号,如下所示:

Input Data:

输入数据:

val base = List(List("001", "a", "abc"), List("001", "a", "123"),List("003", "c", "456") ,List("002", "b", "dfr"), List("003", "c", "ytr"))
  .map(row => (row(0), row(1), row(2)))

val DS1 = base.toDF("KEY1", "KEY2" ,"VAL")

DS1.show()

+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001|   a|abc|
| 001|   a|123|
| 003|   c|456|
| 002|   b|dfr|
| 003|   c|ytr|
+----+----+---+

Now I have added the row number using a window function as below :

现在我使用窗口函数添加了行号,如下所示:

val baseDF =  DS1.select(col("KEY1"), col("KEY2"), col("VAL") ,row_number().over(Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("KEY1"), col("KEY2").asc)).alias("Row_Num"))
baseDF.show()

+----+----+---+-------+
|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a   |abc|1      |
|001 |a   |123|2      |
|002 |b   |dfr|1      |
|003 |c   |456|1      |
|003 |c   |ytr|2      |
+----+----+---+-------+

Now the delta load comes a below :

现在增量负载如下:

val delta = List(List("001", "a", "y45") ,List("002", "b", "444"))
  .map(row => (row(0), row(1), row(2)))

val DS2 = delta.toDF("KEY1", "KEY2" ,"VAL")
DS2.show()

+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001|   a|y45|
| 002|   b|444|
+----+----+---+

So the expected updated result should be:

所以预期的更新结果应该是:

baseDF.show()

|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a   |abc|1      |
|001 |a   |123|2      |
| 001|   a|y45|3      | -----> Delta record
|002 |b   |dfr|1      |
| 002|   b|444|2      | -----> Delta record 
|003 |c   |456|1      |
|003 |c   |ytr|2      |
+----+----+---+-------+

Any suggestions to implement this solution using dataframes/datasets? Can we achieve the above solution with spark rdd's zipWithIndex?

使用数据框/数据集实施此解决方案的任何建议?我们可以用 spark rdd's 来实现上述解决方案zipWithIndex吗?

回答by Leo C

One way to add the delta with updated row numbers is to: 1) add column Row_Numwith a large number in DS2, 2) union baseDFwith it, and 3) calculate the new row numbers, as shown below:

添加具有更新行号的增量的一种方法是:1) 在 中添加Row_Num具有大量数字的列DS2,2)baseDF与其并集,以及 3) 计算新的行号,如下所示:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val combinedDF = baseDF.union(
  DS2.withColumn("Row_Num", lit(Long.MaxValue))
)

val resultDF = combinedDF.select(
  col("KEY1"), col("KEY2"), col("VAL"), row_number().over(
    Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("Row_Num"))
  ).alias("New_Row_Num")
)

resultDF.show
+----+----+---+-----------+
|KEY1|KEY2|VAL|New_Row_Num|
+----+----+---+-----------+
| 003|   c|456|          1|
| 003|   c|ytr|          2|
| 002|   b|dfr|          1|
| 002|   b|444|          2|
| 001|   a|abc|          1|
| 001|   a|123|          2|
| 001|   a|y45|          3|
+----+----+---+-----------+