scala Spark:按组对记录进行排序?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/28543510/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark: Sort records in groups?
提问by zork
I have a set of records which I need to:
我有一组我需要的记录:
1) Group by 'date', 'city' and 'kind'
1) 按“日期”、“城市”和“种类”分组
2) Sort every group by 'prize
2)按“奖品”对每组进行排序
In my code:
在我的代码中:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Sort {
case class Record(name:String, day: String, kind: String, city: String, prize:Int)
val recs = Array (
Record("n1", "d1", "k1", "c1", 10),
Record("n1", "d1", "k1", "c1", 9),
Record("n1", "d1", "k1", "c1", 8),
Record("n2", "d2", "k2", "c2", 1),
Record("n2", "d2", "k2", "c2", 2),
Record("n2", "d2", "k2", "c2", 3)
)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Test")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val rs = sc.parallelize(recs)
val rsGrp = rs.groupBy(r => (r.day, r.kind, r.city)).map(_._2)
val x = rsGrp.map{r =>
val lst = r.toList
lst.map{e => (e.prize, e)}
}
x.sortByKey()
}
}
When I try to sort group I get an error:
当我尝试对组进行排序时,出现错误:
value sortByKey is not a member of org.apache.spark.rdd.RDD[List[(Int,
Sort.Record)]]
What is wrong? How to sort?
怎么了?如何排序?
回答by gasparms
You need define a Key and then mapValues to sort them.
您需要定义一个 Key 然后 mapValues 对它们进行排序。
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
object Sort {
case class Record(name:String, day: String, kind: String, city: String, prize:Int)
// Define your data
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Test")
.setMaster("local")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val rs = sc.parallelize(recs)
// Generate pair RDD neccesary to call groupByKey and group it
val key: RDD[((String, String, String), Iterable[Record])] = rs.keyBy(r => (r.day, r.city, r.kind)).groupByKey
// Once grouped you need to sort values of each Key
val values: RDD[((String, String, String), List[Record])] = key.mapValues(iter => iter.toList.sortBy(_.prize))
// Print result
values.collect.foreach(println)
}
}
回答by Gianmario Spacagna
groupByKey is expensive, it has 2 implications:
groupByKey 很昂贵,它有两个含义:
- Majority of the data get shuffled in the remaining N-1 partitions in average.
- All of the records of the same key get loaded in memory in the single executor potentially causing memory errors.
- 大多数数据平均在剩余的 N-1 个分区中被打乱。
- 同一键的所有记录都被加载到单个执行程序的内存中,这可能会导致内存错误。
Depending of your use case you have different better options:
根据您的用例,您有不同的更好的选择:
- If you don't care about the ordering, use reduceByKey or aggregateByKey.
- If you want to just group and sort without any transformation, prefer using repartitionAndSortWithinPartitions (Spark 1.3.0+ http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions) but be very careful of what partitioner you specify and test it because you are now relying on side effects that may change behaviour in a different environment. See also examples in this repository: https://github.com/sryza/aas/blob/master/ch08-geotime/src/main/scala/com/cloudera/datascience/geotime/RunGeoTime.scala.
- If you are either applying a transformation or a non reducible aggregation (fold or scan) applied to the iterable of sorted records, then check out this library: spark-sorted https://github.com/tresata/spark-sorted. It provides 3 APIs for paired rdds: mapStreamByKey, foldLeftByKey and scanLeftByKey.
- 如果您不关心排序,请使用reduceByKey 或aggregateByKey。
- 如果您只想分组和排序而不进行任何转换,请更喜欢使用 repartitionAndSortWithinPartitions (Spark 1.3.0+ http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd .OrderedRDDFunctions)但要非常小心您指定和测试的分区器,因为您现在依赖于可能会改变不同环境中行为的副作用。另请参阅此存储库中的示例:https: //github.com/sryza/aas/blob/master/ch08-geotime/src/main/scala/com/cloudera/datascience/geotime/RunGeoTime.scala。
- 如果您将转换或不可简化聚合(折叠或扫描)应用于已排序记录的可迭代对象,请查看此库:spark-sorted https://github.com/tresata/spark-sorted。它为配对 rdd 提供了 3 个 API:mapStreamByKey、foldLeftByKey 和 scanLeftByKey。
回答by Soumya Simanta
Replace mapwith flatMap
替换map为flatMap
val x = rsGrp.map{r =>
val lst = r.toList
lst.map{e => (e.prize, e)}
}
this will give you a
这会给你一个
org.apache.spark.rdd.RDD[(Int, Record)] = FlatMappedRDD[10]
and then you can call sortBy(_._1) on the RDD above.
然后你可以在上面的 RDD 上调用 sortBy(_._1) 。
回答by nir
As an alternative to @gasparms solution, I think one can try a filter followed by rdd.sortyBy operation. You filter each record that meets key criteria. Pre requisite is that you need to keep track of all your keys(filter combinations). You can also build it as you traverse through records.
作为@gasparms 解决方案的替代方案,我认为可以尝试一个过滤器,然后是 rdd.sortyBy 操作。您过滤满足关键条件的每条记录。先决条件是您需要跟踪所有键(过滤器组合)。您也可以在遍历记录时构建它。

