java 将索引列添加到现有 Spark 的 DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/38875599/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 03:51:50  来源:igfitidea点击:

Add index column to existing Spark's DataFrame

javaapache-spark

提问by Sergey Repnikov

I operate with Spark 1.5, using Java. I need to append ID/Index column to existing DataFrame, for example:

我使用 Spark 1.5,使用 Java。我需要将 ID/Index 列附加到现有的 DataFrame,例如:

+---------+--------+
|  surname|    name|
+---------+--------+
|    Green|    Jake|
| Anderson|  Thomas|
| Corleone| Michael|
|    Marsh|   Randy|
|  Montana|    Tony|
|    Green|   Julia|
|Brenneman|    Eady|
|   Durden|   Tyler|
| Corleone|    Vito|
|   Madiro|     Mat|
+---------+--------+

I want every row to be appended with index, in range between between 1 and table records amount. Index order does not matter, any row must just contain unique ID/index. It could be done by transformation into RDD and appending index row and transformation into DataFrame with modified StructType, but, If I understand correctly, this operation consumes a lot of resources for transformation etc., and there must be another way. Result must be like:

我希望每一行都附加索引,范围介于 1 和表记录数量之间。索引顺序无关紧要,任何行都必须只包含唯一 ID/索引。可以通过转换为RDD并附加索引行并转换为修改StructType的DataFrame来完成,但是,如果我理解正确的话,这个操作会消耗大量资源进行转换等,并且必须有另一种方式。结果必须是这样的:

+---------+--------+---+
|  surname|    name| id|
+---------+--------+---+
|    Green|    Jake|  3|
| Anderson|  Thomas|  5|
| Corleone| Michael|  2|
|    Marsh|   Randy| 10|
|  Montana|    Tony|  7|
|    Green|   Julia|  1|
|Brenneman|    Eady|  2|
|   Durden|   Tyler|  9|
| Corleone|    Vito|  4|
|   Madiro|     Mat|  6|
+---------+--------+---+

Thank you.

谢谢你。

回答by dbustosp

I know this question might be a while ago, but you can do it as follow:

我知道这个问题可能已经有一段时间了,但您可以按以下方式进行:

from pyspark.sql.window import Window  
w = Window.orderBy("myColumn") 
withIndexDF = originalDF.withColumn("index", row_number().over(w))
  • myColumn: Any specific column from your dataframe.
  • originalDF: original DataFrame withouth the index column.
  • myColumn:数据框中的任何特定列。
  • originalDF: 没有索引列的原始数据帧。

回答by Algorithman

The most concise way to do this in spark data frame:

在火花数据框中执行此操作的最简洁方法:

.withColumn("idx",monotonically_increasing_id())

Complete documentation: https://docs.databricks.com/spark/latest/sparkr/functions/withColumn.html

完整文档:https: //docs.databricks.com/spark/latest/sparkr/functions/withColumn.html

回答by Mário de Sá Vera

Folks, a good approach at :

伙计们,一个很好的方法:

DataFrame-ified zipWithIndex

DataFrame-ified zipWithIndex

simulating the ZipWithIndex method from RDD ... the first suggestion performs better but no big deal with the pure Dataframes solution so far (over 100M lines table in my scenario).

从 RDD 模拟 ZipWithIndex 方法......第一个建议表现更好,但到目前为止对纯 Dataframes 解决方案没什么大不了(在我的场景中超过 100M 行表)。

回答by Mahdi Ghelichi

In Scala, first we need to create an indexing Array:

在 Scala 中,首先我们需要创建一个索引数组:

val indx_arr=(1 to your_df.count.toInt).toArray

indx_arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

Now, we want to append this column to our Dataframe. First, we open up our Dataframe and get it as an array, then we zip it with our indx_arrand then we convert the newly-created array back into and RDD. The final step is to get it as a Dataframe:

现在,我们想将此列附加到我们的 Dataframe 中。首先,我们打开我们的 Dataframe 并将它作为一个数组,然后我们用我们的压缩它,indx_arr然后我们将新创建的数组转换回 RDD。最后一步是将其作为数据框获取:

final_df = sc.parallelize((your_df.collect.map(
    x=>(x(0),x(1))) zip indx_arr).map(
    x=>(x._1._1.toString,x._1._2.toString,x._2))).toDF("surname","name","id")

This is also an easy and straightforward method of appending an array of any kind to our Spark Dataframe.

这也是将任何类型的数组附加到我们的 Spark Dataframe 的一种简单直接的方法。

回答by Bhanu-Bigdata Developer

You can use withColumn function. Usage should be something like Val myDF = existingDF.withColumn("index",express(random(1,existingDF.count())

您可以使用 withColumn 函数。用法应该类似于 Val myDF = existingDF.withColumn("index",express(random(1,existingDF.count())