Python Spark 使用前一行的值向数据框添加新列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/34295642/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 14:45:46  来源:igfitidea点击:

Spark add new column to dataframe with value from previous row

pythonapache-sparkdataframepysparkapache-spark-sql

提问by Kito

I'm wondering how I can achieve the following in Spark (Pyspark)

我想知道如何在 Spark (Pyspark) 中实现以下目标

Initial Dataframe:

初始数据帧:

+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+

Resulting Dataframe:

结果数据框:

+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0|  7.0  |
+--+---+-------+
|3 |7.0|  3.0  |
+--+---+-------+
|2 |3.0|  5.0  |
+--+---+-------+

I manage to generally "append" new columns to a dataframe by using something like: df.withColumn("new_Col", df.num * 10)

我设法通过使用以下内容通常将新列“附加”到数据框: df.withColumn("new_Col", df.num * 10)

However I have no idea on how I can achieve this "shift of rows" for the new column, so that the new column has the value of a field from the previous row (as shown in the example). I also couldn't find anything in the API documentation on how to access a certain row in a DF by index.

但是,我不知道如何为新列实现这种“行移动”,以便新列具有前一行的字段值(如示例所示)。我也无法在 API 文档中找到任何关于如何通过索引访问 DF 中特定行的内容。

Any help would be appreciated.

任何帮助,将不胜感激。

采纳答案by zero323

You can use lagwindow function as follows

您可以使用lag窗口函数如下

from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()

## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## |  2|3.0|    5.0|
## |  3|7.0|    3.0|
## |  4|9.0|    7.0|
## +---+---+-------+

but there some important issues:

但有一些重要的问题:

  1. if you need a global operation (not partitioned by some other column / columns) it is extremely inefficient.
  2. you need a natural way to order your data.
  1. 如果你需要一个全局操作(不被其他列/列分区),它的效率非常低。
  2. 您需要一种自然的方式来排序您的数据。

While the second issue is almost never a problem the first one can be a deal-breaker. If this is the case you should simply convert your DataFrameto RDD and compute lagmanually. See for example:

虽然第二个问题几乎从来都不是问题,但第一个问题可能会破坏交易。如果是这种情况,您应该简单地将您的转换DataFrame为 RDD 并lag手动计算。见例如:

Other useful links:

其他有用的链接:

回答by mputha

   val df = sc.parallelize(Seq((4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0))).toDF("id", "num")
df.show
+---+---+
| id|num|
+---+---+
|  4|9.0|
|  3|7.0|
|  2|3.0|
|  1|5.0|
+---+---+
df.withColumn("new_column", lag("num", 1, 0).over(w)).show
+---+---+----------+
| id|num|new_column|
+---+---+----------+
|  1|5.0|       0.0|
|  2|3.0|       5.0|
|  3|7.0|       3.0|
|  4|9.0|       7.0|
+---+---+----------+