scala Spark Dataframe :How to add a index Column : Aka Distributed Data Index
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 
原文地址: http://stackoverflow.com/questions/43406887/
Warning: these are provided under cc-by-sa 4.0 license.  You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark Dataframe :How to add a index Column : Aka Distributed Data Index
提问by Liangpi
I read data from a csv file ,but don't have index.
我从 csv 文件中读取数据,但没有索引。
I want to add a column from 1 to row's number.
我想从 1 添加一列到行号。
What should I do,Thanks (scala)
我该怎么办,谢谢(scala)
回答by Omar14
With Scala you can use:
使用 Scala,您可以使用:
import org.apache.spark.sql.functions._ 
df.withColumn("id",monotonicallyIncreasingId)
You can refer to this exempleand scala docs.
With Pyspark you can use:
使用 Pyspark,您可以使用:
from pyspark.sql.functions import monotonically_increasing_id 
df_index = df.select("*").withColumn("id", monotonically_increasing_id())
回答by anshu kumar
monotonically_increasing_id- The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
monotonically_increasing_id- 生成的 ID 保证单调递增且唯一,但不连续。
"I want to add a column from 1 to row's number."
“我想从 1 添加一列到行号。”
Let say we have the following DF
假设我们有以下 DF
+--------+-------------+-------+ | userId | productCode | count | +--------+-------------+-------+ | 25 | 6001 | 2 | | 11 | 5001 | 8 | | 23 | 123 | 5 | +--------+-------------+-------+
To generate the IDs starting from 1
生成从 1 开始的 ID
val w = Window.orderBy("count")
val result = df.withColumn("index", row_number().over(w))
This would add an index column ordered by increasing value of count.
这将添加一个按计数值递增排序的索引列。
+--------+-------------+-------+-------+ | userId | productCode | count | index | +--------+-------------+-------+-------+ | 25 | 6001 | 2 | 1 | | 23 | 123 | 5 | 2 | | 11 | 5001 | 8 | 3 | +--------+-------------+-------+-------+
回答by Ram Ghadiyaram
NOTE: Above approaches doesn't give a sequence number, but it does give increasing id.
注意:上述方法没有给出序列号,但确实给出了增加的 id。
Simple way to do that and ensure the order of indexes is like below.. zipWithIndex.
简单的方法来做到这一点,并确保索引的顺序是像下面.. zipWithIndex。
Sample data.
样本数据。
+-------------------+
|               Name|
+-------------------+
|     Ram Ghadiyaram|
|        Ravichandra|
|              ilker|
|               nick|
|             Naveed|
|      Gobinathan SP|
|Sreenivas Venigalla|
|     Hymanela Kowski|
|   Arindam Sengupta|
|            Liangpi|
|             Omar14|
|        anshu kumar|
+-------------------+
    package com.example
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row}
/**
  * DistributedDataIndex : Program to index an RDD  with
  */
object DistributedDataIndex extends App with Logging {
  val spark = builder
    .master("local[*]")
    .appName(this.getClass.getName)
    .getOrCreate()
  import spark.implicits._
  val df = spark.sparkContext.parallelize(
    Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick"
      , "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Hymanela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar"
    )).toDF("Name")
  df.show
  logInfo("addColumnIndex here")
  // Add index now...
  val df1WithIndex = addColumnIndex(df)
    .withColumn("monotonically_increasing_id", monotonically_increasing_id)
  df1WithIndex.show(false)
  /**
    * Add Column Index to dataframe to each row
    */
  def addColumnIndex(df: DataFrame) = {
    spark.sqlContext.createDataFrame(
      df.rdd.zipWithIndex.map {
        case (row, index) => Row.fromSeq(row.toSeq :+ index)
      },
      // Create schema for index column
      StructType(df.schema.fields :+ StructField("index", LongType, false)))
  }
}
Result :
结果 :
+-------------------+-----+---------------------------+
|Name               |index|monotonically_increasing_id|
+-------------------+-----+---------------------------+
|Ram Ghadiyaram     |0    |0                          |
|Ravichandra        |1    |8589934592                 |
|ilker              |2    |8589934593                 |
|nick               |3    |17179869184                |
|Naveed             |4    |25769803776                |
|Gobinathan SP      |5    |25769803777                |
|Sreenivas Venigalla|6    |34359738368                |
|Hymanela Kowski     |7    |42949672960                |
|Arindam Sengupta   |8    |42949672961                |
|Liangpi            |9    |51539607552                |
|Omar14             |10   |60129542144                |
|anshu kumar        |11   |60129542145                |
+-------------------+-----+---------------------------+
回答by Sequinex
How to get a sequential id columnid[1, 2, 3, 4...n]:
如何获得一个连续的 id 列id[1, 2, 3, 4...n]:
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
df_with_seq_id = df.withColumn('index_column_name', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
Note that row_number() starts at 1, therefore subtract by 1 if you want 0-indexed column
请注意,row_number() 从 1 开始,因此如果您想要 0 索引列,则减去 1
回答by Shantanu Sharma
As Ram said, zippedwithindexis better than monotonically increasing id, id you need consecutive row numbers. Try this (PySpark environment):
正如 Ram 所说,zippedwithindex比单调增加 id 好,id 你需要连续的行号。试试这个(PySpark 环境):
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
where original_dataframe is the dataframe you have to add index on and row_with_index is the new schema with the column index which you can write as
其中 original_dataframe 是您必须添加索引的数据帧,而 row_with_index 是带有列索引的新架构,您可以将其写为
row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)
Here, calendar_date, year_week_number, year_period_numberand realization were the columns of my original dataframe. You can replace the names with the names of your columns. indexis the new column name you had to add for the row numbers.
这里calendar_date,year_week_number,year_period_number并实现了我原来的数据框的列。您可以使用列的名称替换名称。index是您必须为行号添加的新列名。
回答by Abhi
If you require a unique sequence number for each row, I have a slightly different approach, where a static column is added and is used to compute the row number using that column.
如果您需要每行唯一的序列号,我有一个稍微不同的方法,其中添加一个静态列并用于使用该列计算行号。
val srcData = spark.read.option("header","true").csv("/FileStore/sample.csv")
srcData.show(5)
+--------+--------------------+
|     Job|                Name|
+--------+--------------------+
|Morpheus|       HR Specialist|
|   Kayla|              Lawyer|
|  Trisha|          Bus Driver|
|  Robert|Elementary School...|
|    Ober|               Judge|
+--------+--------------------+
val srcDataModf = srcData.withColumn("sl_no",lit("1"))
val windowSpecRowNum =  Window.partitionBy("sl_no").orderBy("sl_no")
srcDataModf.withColumn("row_num",row_number.over(windowSpecRowNum)).drop("sl_no").select("row_num","Name","Job")show(5)
+-------+--------------------+--------+
|row_num|                Name|     Job|
+-------+--------------------+--------+
|      1|       HR Specialist|Morpheus|
|      2|              Lawyer|   Kayla|
|      3|          Bus Driver|  Trisha|
|      4|Elementary School...|  Robert|
|      5|               Judge|    Ober|
+-------+--------------------+--------+

