scala 来自 Spark DataFrame/RDD 的前 N 个项目
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/48775083/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Top N items from a Spark DataFrame/RDD
提问by Sam
My requirement is to get the top N items from a dataframe.
我的要求是从数据框中获取前 N 个项目。
I've this DataFrame:
我有这个数据帧:
val df = List(
("MA", "USA"),
("MA", "USA"),
("OH", "USA"),
("OH", "USA"),
("OH", "USA"),
("OH", "USA"),
("NY", "USA"),
("NY", "USA"),
("NY", "USA"),
("NY", "USA"),
("NY", "USA"),
("NY", "USA"),
("CT", "USA"),
("CT", "USA"),
("CT", "USA"),
("CT", "USA"),
("CT", "USA")).toDF("value", "country")
I was able to map it to an RDD[((Int, String), Long)]colValCount:
Read: ((colIdx, value), count)
我能够将它映射到RDD[((Int, String), Long)]colValCount: Read: ((colIdx, value), count)
((0,CT),5)
((0,MA),2)
((0,OH),4)
((0,NY),6)
((1,USA),17)
Now I need to get the top 2 items for each column index. So my expected output is this:
现在我需要获取每个列索引的前 2 个项目。所以我的预期输出是这样的:
RDD[((Int, String), Long)]
((0,CT),5)
((0,NY),6)
((1,USA),17)
I've tried using freqItems api in DataFrame but it's slow.
我试过在 DataFrame 中使用 freqItems api,但速度很慢。
Any suggestions are welcome.
欢迎任何建议。
回答by Alper t. Turker
For example:
例如:
import org.apache.spark.sql.functions._
df.select(lit(0).alias("index"), $"value")
.union(df.select(lit(1), $"country"))
.groupBy($"index", $"value")
.count
.orderBy($"count".desc)
.limit(3)
.show
// +-----+-----+-----+
// |index|value|count|
// +-----+-----+-----+
// | 1| USA| 17|
// | 0| NY| 6|
// | 0| CT| 5|
// +-----+-----+-----+
where:
在哪里:
df.select(lit(0).alias("index"), $"value")
.union(df.select(lit(1), $"country"))
creates a two column DataFrame:
创建一个两列DataFrame:
// +-----+-----+
// |index|value|
// +-----+-----+
// | 0| MA|
// | 0| MA|
// | 0| OH|
// | 0| OH|
// | 0| OH|
// | 0| OH|
// | 0| NY|
// | 0| NY|
// | 0| NY|
// | 0| NY|
// | 0| NY|
// | 0| NY|
// | 0| CT|
// | 0| CT|
// | 0| CT|
// | 0| CT|
// | 0| CT|
// | 1| USA|
// | 1| USA|
// | 1| USA|
// +-----+-----+
If you want specifically two values for each column:
如果您希望每列有两个值:
import org.apache.spark.sql.DataFrame
def topN(df: DataFrame, key: String, n: Int) = {
df.select(
lit(df.columns.indexOf(key)).alias("index"),
col(key).alias("value"))
.groupBy("index", "value")
.count
.orderBy($"count")
.limit(n)
}
topN(df, "value", 2).union(topN(df, "country", 2)).show
// +-----+-----+-----+
// |index|value|count|
// +-----+-----+-----+
// | 0| MA| 2|
// | 0| OH| 4|
// | 1| USA| 17|
// +-----+-----+-----+
So like paultsaid- just "some combination of sort()and limit()".
回答by Kirk Broadhurst
The easiest way to do this - a natural window function - is by writing SQL. Spark comes with SQL syntax, and SQL is a great and expressive tool for this problem.
执行此操作的最简单方法 - 自然窗口函数 - 是编写 SQL。Spark 自带 SQL 语法,SQL 是解决这个问题的一个很好的表达工具。
Register your dataframe as a temp table, and then group and window on it.
将您的数据框注册为临时表,然后对其进行分组和窗口化。
spark.sql("""SELECT idx, value, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY c DESC) as r
FROM (
SELECT idx, value, COUNT(*) as c
FROM (SELECT 0 as idx, value FROM df UNION ALL SELECT 1, country FROM df)
GROUP BY idx, value)
HAVING r <= 2""").show()
I'd like to see if any of the procedural / scala approaches will let you perform the window function without an iteration or loop. I'm not aware of anything in the Spark API that would support it.
我想看看是否有任何程序/Scala 方法可以让您在没有迭代或循环的情况下执行窗口函数。我不知道 Spark API 中有任何支持它的内容。
Incidentally, if you have an arbitrary number of columns you want to include then you can quite easily generate the inner section (SELECT 0 as idx, value ... UNION ALL SELECT 1, country, etc) dynamically using the list of columns.
顺便说一句,如果您想要包含任意数量的列,那么您可以很容易SELECT 0 as idx, value ... UNION ALL SELECT 1, country地使用列列表动态生成内部部分(等)。
回答by Xavier Guihot
Given your last RDD:
鉴于您的最后一个 RDD:
val rdd =
sc.parallelize(
List(
((0, "CT"), 5),
((0, "MA"), 2),
((0, "OH"), 4),
((0, "NY"), 6),
((1, "USA"), 17)
))
rdd.filter(_._1._1 == 0).sortBy(-_._2).take(2).foreach(println)
> ((0,NY),6)
> ((0,CT),5)
rdd.filter(_._1._1 == 1).sortBy(-_._2).take(2).foreach(println)
> ((1,USA),17)
We first get items for a given column index (.filter(_._1._1 == 0)). Then we sort items by decreasing order (.sortBy(-_._2)). And finally, we take at most the 2 first elements (.take(2)), which takes only 1 element if the nbr of record is lower than 2.
我们首先获取给定列索引 ( .filter(_._1._1 == 0)) 的项目。然后我们按降序对项目进行排序 ( .sortBy(-_._2))。最后,我们最多取前 2 个元素 ( .take(2)),如果记录的 nbr 小于 2,则只取 1 个元素。
回答by Gianmario Spacagna
You can map each single partition using this helper function defined in Sparkz and then combine them together:
您可以使用 Sparkz 中定义的这个辅助函数映射每个单独的分区,然后将它们组合在一起:
package sparkz.utils
import scala.reflect.ClassTag
object TopElements {
def topN[T: ClassTag](elems: Iterable[T])(scoreFunc: T => Double, n: Int): List[T] =
elems.foldLeft((Set.empty[(T, Double)], Double.MaxValue)) {
case (accumulator@(topElems, minScore), elem) =>
val score = scoreFunc(elem)
if (topElems.size < n)
(topElems + (elem -> score), math.min(minScore, score))
else if (score > minScore) {
val newTopElems = topElems - topElems.minBy(_._2) + (elem -> score)
(newTopElems, newTopElems.map(_._2).min)
}
else accumulator
}
._1.toList.sortBy(_._2).reverse.map(_._1)
}
Source: https://github.com/gm-spacagna/sparkz/blob/master/src/main/scala/sparkz/utils/TopN.scala
来源:https: //github.com/gm-spacagna/sparkz/blob/master/src/main/scala/sparkz/utils/TopN.scala
回答by morbvel
If you are working with Spark SQL Dataframes, in my opinion the best (and easier solution to understand) is to perfom your code like that:
如果您正在使用 Spark SQL Dataframes,在我看来,最好的(也是更容易理解的解决方案)是这样执行您的代码:
val test: Dataframe = df.select(col("col_name"))
test.show(5, false)
Hope it helps you :)
希望对你有帮助:)

