java 将索引列添加到现有 Spark 的 DataFrame
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38875599/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Add index column to existing Spark's DataFrame
提问by Sergey Repnikov
I operate with Spark 1.5, using Java. I need to append ID/Index column to existing DataFrame, for example:
我使用 Spark 1.5,使用 Java。我需要将 ID/Index 列附加到现有的 DataFrame,例如:
+---------+--------+
| surname| name|
+---------+--------+
| Green| Jake|
| Anderson| Thomas|
| Corleone| Michael|
| Marsh| Randy|
| Montana| Tony|
| Green| Julia|
|Brenneman| Eady|
| Durden| Tyler|
| Corleone| Vito|
| Madiro| Mat|
+---------+--------+
I want every row to be appended with index, in range between between 1 and table records amount. Index order does not matter, any row must just contain unique ID/index. It could be done by transformation into RDD and appending index row and transformation into DataFrame with modified StructType, but, If I understand correctly, this operation consumes a lot of resources for transformation etc., and there must be another way. Result must be like:
我希望每一行都附加索引,范围介于 1 和表记录数量之间。索引顺序无关紧要,任何行都必须只包含唯一 ID/索引。可以通过转换为RDD并附加索引行并转换为修改StructType的DataFrame来完成,但是,如果我理解正确的话,这个操作会消耗大量资源进行转换等,并且必须有另一种方式。结果必须是这样的:
+---------+--------+---+
| surname| name| id|
+---------+--------+---+
| Green| Jake| 3|
| Anderson| Thomas| 5|
| Corleone| Michael| 2|
| Marsh| Randy| 10|
| Montana| Tony| 7|
| Green| Julia| 1|
|Brenneman| Eady| 2|
| Durden| Tyler| 9|
| Corleone| Vito| 4|
| Madiro| Mat| 6|
+---------+--------+---+
Thank you.
谢谢你。
回答by dbustosp
I know this question might be a while ago, but you can do it as follow:
我知道这个问题可能已经有一段时间了,但您可以按以下方式进行:
from pyspark.sql.window import Window
w = Window.orderBy("myColumn")
withIndexDF = originalDF.withColumn("index", row_number().over(w))
- myColumn: Any specific column from your dataframe.
- originalDF: original DataFrame withouth the index column.
- myColumn:数据框中的任何特定列。
- originalDF: 没有索引列的原始数据帧。
回答by Algorithman
The most concise way to do this in spark data frame:
在火花数据框中执行此操作的最简洁方法:
.withColumn("idx",monotonically_increasing_id())
Complete documentation: https://docs.databricks.com/spark/latest/sparkr/functions/withColumn.html
完整文档:https: //docs.databricks.com/spark/latest/sparkr/functions/withColumn.html
回答by Mário de Sá Vera
Folks, a good approach at :
伙计们,一个很好的方法:
simulating the ZipWithIndex method from RDD ... the first suggestion performs better but no big deal with the pure Dataframes solution so far (over 100M lines table in my scenario).
从 RDD 模拟 ZipWithIndex 方法......第一个建议表现更好,但到目前为止对纯 Dataframes 解决方案没什么大不了(在我的场景中超过 100M 行表)。
回答by Mahdi Ghelichi
In Scala, first we need to create an indexing Array:
在 Scala 中,首先我们需要创建一个索引数组:
val indx_arr=(1 to your_df.count.toInt).toArray
indx_arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Now, we want to append this column to our Dataframe.
First, we open up our Dataframe and get it as an array, then we zip it with our indx_arr
and then we convert the newly-created array back into and RDD. The final step is to get it as a Dataframe:
现在,我们想将此列附加到我们的 Dataframe 中。首先,我们打开我们的 Dataframe 并将它作为一个数组,然后我们用我们的压缩它,indx_arr
然后我们将新创建的数组转换回 RDD。最后一步是将其作为数据框获取:
final_df = sc.parallelize((your_df.collect.map(
x=>(x(0),x(1))) zip indx_arr).map(
x=>(x._1._1.toString,x._1._2.toString,x._2))).toDF("surname","name","id")
This is also an easy and straightforward method of appending an array of any kind to our Spark Dataframe.
这也是将任何类型的数组附加到我们的 Spark Dataframe 的一种简单直接的方法。
回答by Bhanu-Bigdata Developer
You can use withColumn function. Usage should be something like Val myDF = existingDF.withColumn("index",express(random(1,existingDF.count())
您可以使用 withColumn 函数。用法应该类似于 Val myDF = existingDF.withColumn("index",express(random(1,existingDF.count())