scala 将行值转换为火花数据框中的列数组

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36332232/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:07:30  来源:igfitidea点击:

Converting row values into a column array in spark dataframe

scalaapache-sparkspark-dataframe

提问by prasannads

I am working on spark dataframes and I need to do a group by of a column and convert the column values of grouped rows into an array of elements as new column. Example :

我正在处理 spark 数据框,我需要对列进行分组,并将分组行的列值转换为元素数组作为新列。例子 :

Input:

employee | Address
------------------
Micheal  |  NY
Micheal  |  NJ

Output:

employee | Address
------------------
Micheal  | (NY,NJ)

Any help is highly appreciated.!

任何帮助都非常感谢。!

回答by Vishnu667

Here is an alternate solution Where I have converted the dataframe to an rdd for the transformations and converted it back a dataFrame using sqlContext.createDataFrame()

这是一个替代解决方案,我已将数据帧转换为用于转换的 rdd,并使用以下命令将其转换回数据帧 sqlContext.createDataFrame()

Sample.json

示例.json

{"employee":"Michale","Address":"NY"}
{"employee":"Michale","Address":"NJ"}
{"employee":"Sam","Address":"NY"}
{"employee":"Max","Address":"NJ"}

Spark Application

星火应用

val df = sqlContext.read.json("sample.json")

// Printing the original Df
df.show()

//Defining the Schema for the aggregated DataFrame
val dataSchema = new StructType(
  Array(
    StructField("employee", StringType, nullable = true),
    StructField("Address", ArrayType(StringType, containsNull = true), nullable = true)
  )
)
// Converting the df to rdd and performing the groupBy operation
val aggregatedRdd: RDD[Row] = df.rdd.groupBy(r =>
          r.getAs[String]("employee")
        ).map(row =>
          // Mapping the Grouped Values to a new Row Object
          Row(row._1, row._2.map(_.getAs[String]("Address")).toArray)
        )

// Creating a DataFrame from the aggregatedRdd with the defined Schema (dataSchema)
val aggregatedDf = sqlContext.createDataFrame(aggregatedRdd, dataSchema)

// Printing the aggregated Df
aggregatedDf.show()

Output :

输出 :

+-------+--------+---+
|Address|employee|num|
+-------+--------+---+
|     NY| Michale|  1|
|     NJ| Michale|  2|
|     NY|     Sam|  3|
|     NJ|     Max|  4|
+-------+--------+---+

+--------+--------+
|employee| Address|
+--------+--------+
|     Sam|    [NY]|
| Michale|[NY, NJ]|
|     Max|    [NJ]|
+--------+--------+

回答by DataGeek

If you're using Spark 2.0+, you can use collect_listor collect_set. Your query will be something like (assuming your dataframe is called input):

如果您使用的是Spark 2.0+,则可以使用collect_listcollect_set。您的查询将类似于(假设您的数据框称为input):

import org.apache.spark.sql.functions._

input.groupBy('employee).agg(collect_list('Address))

If you are ok with duplicates, use collect_list. If you're not ok with duplicates and only need unique items in the list, use collect_set.

如果您对重复项没问题,请使用collect_list. 如果您不喜欢重复项并且只需要列表中的唯一项,请使用collect_set.

Hope this helps!

希望这可以帮助!