scala 在 Spark 中使用自定义函数聚合多列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37737843/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:21:51  来源:igfitidea点击:

Aggregating multiple columns with custom function in Spark

scalaapache-sparkdataframeapache-spark-sqlorc

提问by anthonybell

I was wondering if there is some way to specify a custom aggregation function for spark dataframes over multiple columns.

我想知道是否有某种方法可以为多列的火花数据帧指定自定义聚合函数。

I have a table like this of the type (name, item, price):

我有一个这样的表格(名称,项目,价格):

john | tomato | 1.99
john | carrot | 0.45
bill | apple  | 0.99
john | banana | 1.29
bill | taco   | 2.59

to:

到:

I would like to aggregate the item and it's cost for each person into a list like this:

我想将每个人的项目和成本汇总到这样的列表中:

john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)

Is this possible in dataframes? I recently learned about collect_listbut it appears to only work for one column.

这在数据帧中可能吗?我最近了解到,collect_list但它似乎只适用于一列。

采纳答案by David Griffin

The easiest way to do this as a DataFrameis to first collect two lists, and then use a UDFto zipthe two lists together. Something like:

最简单的方法来做到这一点的DataFrame是先收集两个列表,然后使用UDFzip两个列表在一起。就像是:

import org.apache.spark.sql.functions.{collect_list, udf}
import sqlContext.implicits._

val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

val df2 = df.groupBy("name").agg(
  collect_list(col("food")) as "food",
  collect_list(col("price")) as "price" 
).withColumn("food", zipper(col("food"), col("price"))).drop("price")

df2.show(false)
# +----+---------------------------------------------+
# |name|food                                         |
# +----+---------------------------------------------+
# |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
# |bill|[[apple,0.99], [taco,2.59]]                  |
# +----+---------------------------------------------+

回答by Daniel Siegmann

Consider using the structfunction to group the columns together before collecting as a list:

考虑struct在收集为列表之前使用该函数将列分组在一起:

import org.apache.spark.sql.functions.{collect_list, struct}
import sqlContext.implicits._

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

df.groupBy($"name")
  .agg(collect_list(struct($"food", $"price")).as("foods"))
  .show(false)

Outputs:

输出:

+----+---------------------------------------------+
|name|foods                                        |
+----+---------------------------------------------+
|john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
|bill|[[apple,0.99], [taco,2.59]]                  |
+----+---------------------------------------------+

回答by Yifan Guo

Maybe a better way than the zipfunction (since UDF and UDAF are very bad to performance) is to wrap the two columns into Struct.

也许比zip函数更好的方法(因为 UDF 和 UDAF 对性能非常不利)是将两列包装成Struct.

This would probably work as well:

这可能也有效:

df.select('name, struct('food, 'price).as("tuple"))
  .groupBy('name)
  .agg(collect_list('tuple).as("tuples"))

回答by Psidom

Here is an option by converting the data frame to a RDD of Map and then call a groupByKeyon it. The result would be a list of key-value pairs where value is a list of tuples.

这是通过将数据框转换为 Map 的 RDD 然后对其调用 a 的选项groupByKey。结果将是一个键值对列表,其中 value 是一个元组列表。

df.show
+----+------+----+
|  _1|    _2|  _3|
+----+------+----+
|john|tomato|1.99|
|john|carrot|0.45|
|bill| apple|0.99|
|john|banana|1.29|
|bill|  taco|2.59|
+----+------+----+


val tuples = df.map(row => row(0) -> (row(1), row(2)))
tuples: org.apache.spark.rdd.RDD[(Any, (Any, Any))] = MapPartitionsRDD[102] at map at <console>:43

tuples.groupByKey().map{ case(x, y) => (x, y.toList) }.collect
res76: Array[(Any, List[(Any, Any)])] = Array((bill,List((apple,0.99), (taco,2.59))), (john,List((tomato,1.99), (carrot,0.45), (banana,1.29))))

回答by Neha Kumari

To your point collect_list appears to only work for one column: For collect_list to work on multiple columns you will have to wrap the columns you want as aggregate in a struct. For e.g :

就您而言,collect_list 似乎仅适用于一列:要使 collect_list 在多列上工作,您必须将想要的列作为聚合体包装在一个结构中。例如:

     val aggregatedData = df.groupBy("name").agg(collect_list(struct("item", "price")) as("food"))

     aggregatedData.show
+----+------------------------------------------------+
|name|foods                                           |
+----+------------------------------------------------+
|john|[[tomato, 1.99], [carrot, 0.45], [banana, 1.29]]|
|bill|[[apple, 0.99], [taco, 2.59]]                   |
+----+------------------------------------------------+