scala 来自 Spark DataFrame/RDD 的前 N ​​个项目

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/48775083/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:33:06  来源:igfitidea点击:

Top N items from a Spark DataFrame/RDD

scalaapache-sparktop-n

提问by Sam

My requirement is to get the top N items from a dataframe.

我的要求是从数据框中获取前 N 个项目。

I've this DataFrame:

我有这个数据帧:

val df = List(
      ("MA", "USA"),
      ("MA", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA")).toDF("value", "country")

I was able to map it to an RDD[((Int, String), Long)]colValCount: Read: ((colIdx, value), count)

我能够将它映射到RDD[((Int, String), Long)]colValCount: Read: ((colIdx, value), count)

((0,CT),5)
((0,MA),2)
((0,OH),4)
((0,NY),6)
((1,USA),17)

Now I need to get the top 2 items for each column index. So my expected output is this:

现在我需要获取每个列索引的前 2 个项目。所以我的预期输出是这样的:

RDD[((Int, String), Long)]

((0,CT),5)
((0,NY),6)
((1,USA),17)

I've tried using freqItems api in DataFrame but it's slow.

我试过在 DataFrame 中使用 freqItems api,但速度很慢。

Any suggestions are welcome.

欢迎任何建议。

回答by Alper t. Turker

For example:

例如:

import org.apache.spark.sql.functions._

df.select(lit(0).alias("index"), $"value")
   .union(df.select(lit(1), $"country"))
   .groupBy($"index", $"value")
   .count
  .orderBy($"count".desc)
  .limit(3)
  .show

// +-----+-----+-----+
// |index|value|count|
// +-----+-----+-----+
// |    1|  USA|   17|
// |    0|   NY|    6|
// |    0|   CT|    5|
// +-----+-----+-----+

where:

在哪里:

df.select(lit(0).alias("index"), $"value")
  .union(df.select(lit(1), $"country"))

creates a two column DataFrame:

创建一个两列DataFrame

// +-----+-----+
// |index|value|
// +-----+-----+
// |    0|   MA|
// |    0|   MA|
// |    0|   OH|
// |    0|   OH|
// |    0|   OH|
// |    0|   OH|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    1|  USA|
// |    1|  USA|
// |    1|  USA|
// +-----+-----+

If you want specifically two values for each column:

如果您希望每列有两个值:

import org.apache.spark.sql.DataFrame

def topN(df: DataFrame, key: String, n: Int)  = {
   df.select(
        lit(df.columns.indexOf(key)).alias("index"), 
        col(key).alias("value"))
     .groupBy("index", "value")
     .count
     .orderBy($"count")
     .limit(n)
}

topN(df, "value", 2).union(topN(df, "country", 2)).show
// +-----+-----+-----+ 
// |index|value|count|
// +-----+-----+-----+
// |    0|   MA|    2|
// |    0|   OH|    4|
// |    1|  USA|   17|
// +-----+-----+-----+

So like paultsaid- just "some combination of sort()and limit()".

所以就像pault说的——只是“”的某种组合sort()limit()

回答by Kirk Broadhurst

The easiest way to do this - a natural window function - is by writing SQL. Spark comes with SQL syntax, and SQL is a great and expressive tool for this problem.

执行此操作的最简单方法 - 自然窗口函数 - 是编写 SQL。Spark 自带 SQL 语法,SQL 是解决这个问题的一个很好的表达工具。

Register your dataframe as a temp table, and then group and window on it.

将您的数据框注册为临时表,然后对其进行分组和窗口化。

spark.sql("""SELECT idx, value, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY c DESC) as r 
             FROM (
               SELECT idx, value, COUNT(*) as c 
               FROM (SELECT 0 as idx, value FROM df UNION ALL SELECT 1, country FROM df) 
               GROUP BY idx, value) 
             HAVING r <= 2""").show()

I'd like to see if any of the procedural / scala approaches will let you perform the window function without an iteration or loop. I'm not aware of anything in the Spark API that would support it.

我想看看是否有任何程序/Scala 方法可以让您在没有迭代或循环的情况下执行窗口函数。我不知道 Spark API 中有任何支持它的内容。

Incidentally, if you have an arbitrary number of columns you want to include then you can quite easily generate the inner section (SELECT 0 as idx, value ... UNION ALL SELECT 1, country, etc) dynamically using the list of columns.

顺便说一句,如果您想要包含任意数量的列,那么您可以很容易SELECT 0 as idx, value ... UNION ALL SELECT 1, country地使用列列表动态生成内部部分(等)。

回答by Xavier Guihot

Given your last RDD:

鉴于您的最后一个 RDD:

val rdd =
  sc.parallelize(
    List(
      ((0, "CT"), 5),
      ((0, "MA"), 2),
      ((0, "OH"), 4),
      ((0, "NY"), 6),
      ((1, "USA"), 17)
    ))

rdd.filter(_._1._1 == 0).sortBy(-_._2).take(2).foreach(println)
> ((0,NY),6)
> ((0,CT),5)
rdd.filter(_._1._1 == 1).sortBy(-_._2).take(2).foreach(println)
> ((1,USA),17)

We first get items for a given column index (.filter(_._1._1 == 0)). Then we sort items by decreasing order (.sortBy(-_._2)). And finally, we take at most the 2 first elements (.take(2)), which takes only 1 element if the nbr of record is lower than 2.

我们首先获取给定列索引 ( .filter(_._1._1 == 0)) 的项目。然后我们按降序对项目进行排序 ( .sortBy(-_._2))。最后,我们最多取前 2 个元素 ( .take(2)),如果记录的 nbr 小于 2,则只取 1 个元素。

回答by Gianmario Spacagna

You can map each single partition using this helper function defined in Sparkz and then combine them together:

您可以使用 Sparkz 中定义的这个辅助函数映射每个单独的分区,然后将它们组合在一起:

package sparkz.utils

import scala.reflect.ClassTag

object TopElements {
  def topN[T: ClassTag](elems: Iterable[T])(scoreFunc: T => Double, n: Int): List[T] =
    elems.foldLeft((Set.empty[(T, Double)], Double.MaxValue)) {
      case (accumulator@(topElems, minScore), elem) =>
        val score = scoreFunc(elem)
        if (topElems.size < n)
          (topElems + (elem -> score), math.min(minScore, score))
        else if (score > minScore) {
          val newTopElems = topElems - topElems.minBy(_._2) + (elem -> score)
          (newTopElems, newTopElems.map(_._2).min)
        }
        else accumulator
    }
      ._1.toList.sortBy(_._2).reverse.map(_._1)
}

Source: https://github.com/gm-spacagna/sparkz/blob/master/src/main/scala/sparkz/utils/TopN.scala

来源:https: //github.com/gm-spacagna/sparkz/blob/master/src/main/scala/sparkz/utils/TopN.scala

回答by morbvel

If you are working with Spark SQL Dataframes, in my opinion the best (and easier solution to understand) is to perfom your code like that:

如果您正在使用 Spark SQL Dataframes,在我看来,最好的(也是更容易理解的解决方案)是这样执行您的代码:

val test: Dataframe = df.select(col("col_name"))
test.show(5, false)

Hope it helps you :)

希望对你有帮助:)