Scala spark按键减少并找到共同价值

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32175247/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:30:04  来源:igfitidea点击:

Scala spark reduce by key and find common value

scalahadoopapache-spark

提问by VSEWHGHP

I have a file of csv data stored in as a sequenceFile on HDFS, in the format of name, zip, country, fav_food1, fav_food2, fav_food3, fav_colour. There could be many entries with the same name and I needed to find out what their favourite food was (ie count all the food entries in all the records with that name and return the most popular one. I am new to Scala and Spark and have gone thorough multiple tutorials and scoured the forums but am stuck as to how to proceed. So far I have got the sequence files which had Text into String format and then filtered out the entries

我有一个 csv 数据文件作为序列文件存储在 HDFS 上,格式为name, zip, country, fav_food1, fav_food2, fav_food3, fav_colour. 可能有很多同名条目,我需要找出他们最喜欢的食物是什么(即计算所有具有该名称的记录中的所有食物条目并返回最受欢迎的条目。我是 Scala 和 Spark 的新手,并且有仔细阅读了多个教程并搜索了论坛,但我不知道如何继续。到目前为止,我已经将文本转换为字符串格式的序列文件,然后过滤掉了条目

Here is the sample data entries one to a line in the file

这是文件中一行到一行的示例数据条目

Bob,123,USA,Pizza,Soda,,Blue
Bob,456,UK,Chocolate,Cheese,Soda,Green
Bob,12,USA,Chocolate,Pizza,Soda,Yellow
Mary,68,USA,Chips,Pasta,Chocolate,Blue

So the output should be the tuple (Bob, Soda) since soda appears the most amount of times in Bob's entries.

所以输出应该是元组 (Bob, Soda),因为 soda 在 Bob 的条目中出现的次数最多。

import org.apache.hadoop.io._

var lines  = sc.sequenceFile("path",classOf[LongWritable],classOf[Text]).values.map(x => x.toString())
// converted to string since I could not get filter to run on Text and removing the longwritable

var filtered = lines.filter(_.split(",")(0) == "Bob");
// removed entries with all other users

var f_tuples = filtered.map(line => lines.split(",");
// split all the values

var f_simple = filtered.map(line => (line(0), (line(3), line(4), line(5))
// removed unnecessary fields

This Issue I have now is that I think I have this [<name,[f,f,f]>]structure and don't really know how to proceed to flatten it out and get the most popular food. I need to combine all the entries so I have a entry with a and then get the most common element in the value. Any help would be appreciated. Thanks

我现在[<name,[f,f,f]>]遇到的这个问题是,我认为我有这种结构,但我真的不知道如何将它弄平并获得最受欢迎的食物。我需要组合所有条目,以便我有一个带有 a 的条目,然后获取值中最常见的元素。任何帮助,将不胜感激。谢谢

I tried this to get it to flatten out, but it seems the more I try, the more convoluted the data structure becomes.

我试过这个来让它变平,但似乎我尝试的越多,数据结构变得越复杂。

var f_trial = fpairs.groupBy(_._1).mapValues(_.map(_._2))
// the resulting structure was of type org.apache.spark.rdd.RDD[(String, Interable[(String, String, String)]

here is what a println of a record looks like after f_trial

这是 f_trial 之后记录的 println 的样子

("Bob", List((Pizza, Soda,), (Chocolate, Cheese, Soda), (Chocolate, Pizza, Soda)))

Parenthesis Breakdown

括号分解

("Bob", 

List(

(Pizza, Soda, <missing value>),

(Chocolate, Cheese, Soda),

(Chocolate, Pizza, Soda)

) // ends List paren

) // ends first paren

回答by The Archetypal Paul

I found time. Setup:

我找到了时间。设置:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf

    val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
    val sc = new SparkContext(conf)

    val data = """   
  Bob,123,USA,Pizza,Soda,,Blue
  Bob,456,UK,Chocolate,Cheese,Soda,Green
  Bob,12,USA,Chocolate,Pizza,Soda,Yellow
  Mary,68,USA,Chips,Pasta,Chocolate,Blue
  """.trim

    val records = sc.parallelize(data.split('\n'))

Extract the food choices, and for each make a tuple of ((name, food), 1)

提取食物选择,并为每个选择一个元组 ((name, food), 1)

    val r2 = records.flatMap { r =>
      val Array(name, id, country, food1, food2, food3, color) = r.split(',');
      List(((name, food1), 1), ((name, food2), 1), ((name, food3), 1))
    }

Total up each name/food combination:

合计每个名称/食物组合:

    val r3 = r2.reduceByKey((x, y) => x + y)

Remap so that the name (only) is the key

重新映射,以便名称(仅)是关键

    val r4 = r3.map { case ((name, food), total) => (name, (food, total)) }

Pick the food with the largest count at each step

选择每一步计数最大的食物

    val res = r4.reduceByKey((x, y) => if (y._2 > x._2) y else x)

And we're done

我们完成了

    println(res.collect().mkString)
    //(Mary,(Chips,1))(Bob,(Soda,3))

EDIT: To collect all the food items that have the same top count for a person, we just change the last two lines:

编辑:要收集一个人的所有食物数量相同的食物,我们只需更改最后两行:

Start with a List of items with total:

从项目列表开始,总计:

val r5 = r3.map { case ((name, food), total) => (name, (List(food), total)) }

In the equal case, concatenate the list of food items with that score

在同等情况下,将具有该分数的食品列表连接起来

val res2 = r5.reduceByKey((x, y) => if (y._2 > x._2) y 
                                    else if (y._2 < x._2) x
                                    else (y._1:::x._1, y._2))

//(Mary,(List(Chocolate, Pasta, Chips),1))
//(Bob,(List(Soda),3))

If you want the top-3, say, then use aggregateByKeyto assemble a list of the favorite foods per person instead of the second reduceByKey

例如,如果您想要前 3 名,则使用它aggregateByKey来汇总每个人最喜欢的食物列表,而不是第二个reduceByKey

回答by zero323

Solutions provided by Pauland mattinbitsshuffle your data twice - once to perform reduce-by-name-and-food and once to reduce-by-name. It is possible to solve this problem with only one shuffle.

Paulmattinbits提供的解决方案将您的数据洗牌两次 - 一次执行按名称和食物减少,一次执行按名称减少。只需一次shuffle就可以解决这个问题。

/**Generate key-food_count pairs from a splitted line**/
def bitsToKeyMapPair(xs: Array[String]): (String, Map[String, Long]) = {
  val key = xs(0)
  val map = xs
    .drop(3) // Drop name..country
    .take(3) // Take food
    .filter(_.trim.size !=0) // Ignore empty
    .map((_, 1L)) // Generate k-v pairs
    .toMap // Convert to Map
    .withDefaultValue(0L) // Set default

  (key, map)
}

/**Combine two count maps**/
def combine(m1: Map[String, Long], m2: Map[String, Long]): Map[String, Long] = {
  (m1.keys ++ m2.keys).map(k => (k, m1(k) + m2(k))).toMap.withDefaultValue(0L)
}

val n: Int = ??? // Number of favorite per user

val records = lines.map(line => bitsToKeyMapPair(line.split(",")))
records.reduceByKey(combine).mapValues(_.toSeq.sortBy(-_._2).take(n))

If you're not a purist you can replace scala.collection.immutable.Mapwith scala.collection.mutable.Mapto further improve performance.

如果您不是纯粹主义者,则可以替换scala.collection.immutable.Mapscala.collection.mutable.Map以进一步提高性能。

回答by mattinbits

Here's a complete example:

这是一个完整的例子:

import org.apache.spark.{SparkContext, SparkConf}


object Main extends App {

  val data = List(
    "Bob,123,USA,Pizza,Soda,,Blue",
    "Bob,456,UK,Chocolate,Cheese,Soda,Green",
    "Bob,12,USA,Chocolate,Pizza,Soda,Yellow",
    "Mary,68,USA,Chips,Pasta,Chocolate,Blue")

  val sparkConf = new SparkConf().setMaster("local").setAppName("example")
  val sc = new SparkContext(sparkConf)

  val lineRDD = sc.parallelize(data)

  val pairedRDD = lineRDD.map { line =>
    val fields = line.split(",")
    (fields(0), List(fields(3), fields(4), fields(5)).filter(_ != ""))
  }.filter(_._1 == "Bob")

  /*pairedRDD.collect().foreach(println)
    (Bob,List(Pizza, Soda))
    (Bob,List(Chocolate, Cheese, Soda))
    (Bob,List(Chocolate, Pizza, Soda))
   */

  val flatPairsRDD = pairedRDD.flatMap {
    case (name, foodList) => foodList.map(food => ((name, food), 1))
  }

  /*flatPairsRDD.collect().foreach(println)
    ((Bob,Pizza),1)
    ((Bob,Soda),1)
    ((Bob,Chocolate),1)
    ((Bob,Cheese),1)
    ((Bob,Soda),1)
    ((Bob,Chocolate),1)
    ((Bob,Pizza),1)
    ((Bob,Soda),1)
   */

  val nameFoodSumRDD = flatPairsRDD.reduceByKey((a, b) => a + b)

  /*nameFoodSumRDD.collect().foreach(println)
    ((Bob,Cheese),1)
    ((Bob,Soda),3)
    ((Bob,Pizza),2)
    ((Bob,Chocolate),2)
   */

  val resultsRDD = nameFoodSumRDD.map{
    case ((name, food), count) => (name, (food,count))
  }.groupByKey.map{
    case (name, foodCountList) => (name, foodCountList.toList.sortBy(_._2).reverse.head)
  }

  resultsRDD.collect().foreach(println)
  /*
      (Bob,(Soda,3))
   */

  sc.stop()
}