scala 如何在 Spark 中对 RDD 进行排序和限制?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32947978/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to sort an RDD and limit in Spark?
提问by etig
I have RDD of Foo class : class Foo( name : String, createDate : Date ).
I want an other RDD with 10 percent older Foo.
My first idea was to sort by createDateand limit by 0.1*count, but there is no limit function.
我有 Foo 类的 RDD : class Foo( name : String, createDate : Date )。我想要另一个 10% 旧的 RDD Foo。我的第一个想法是createDate按 0.1*count排序和限制,但没有限制功能。
Have you an idea?
你有什么想法吗?
回答by zero323
Assuming Foois a case class like this:
假设Foo是一个这样的案例类:
import java.sql.Date
case class Foo(name: String, createDate: java.sql.Date)
Using plain RDDs:
import org.apache.spark.rdd.RDD import scala.math.Ordering val rdd: RDD[Foo] = sc .parallelize(Seq( ("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"), ("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23"))) .toDF("name", "createDate") .withColumn("createDate", $"createDate".cast("date")) .as[Foo].rdd rdd.cache() val n = scala.math.ceil(0.1 * rdd.count).toIntdata fits into driver memory:
and fraction you want is relatively small
rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime)) // Array[Foo] = Array(Foo(a,2009-11-23))fraction you want is relatively large:
rdd.sortBy(_.createDate.getTime).take(n)
otherwise
rdd .sortBy(_.createDate.getTime) .zipWithIndex .filter{case (_, idx) => idx < n} .keys
Using DataFrame (note - this is actually not optimal performance wise due to limit behavior).
import org.apache.spark.sql.Row val topN = rdd.toDF.orderBy($"createDate").limit(n) topN.show // +----+----------+ // |name|createDate| // +----+----------+ // | a|2009-11-23| // +----+----------+ // Optionally recreate RDD[Foo] topN.map{case Row(name: String, date: Date) => Foo(name, date)}
使用普通的 RDD:
import org.apache.spark.rdd.RDD import scala.math.Ordering val rdd: RDD[Foo] = sc .parallelize(Seq( ("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"), ("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23"))) .toDF("name", "createDate") .withColumn("createDate", $"createDate".cast("date")) .as[Foo].rdd rdd.cache() val n = scala.math.ceil(0.1 * rdd.count).toInt数据适合驱动程序内存:
你想要的分数相对较小
rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime)) // Array[Foo] = Array(Foo(a,2009-11-23))你想要的分数比较大:
rdd.sortBy(_.createDate.getTime).take(n)
否则
rdd .sortBy(_.createDate.getTime) .zipWithIndex .filter{case (_, idx) => idx < n} .keys
使用 DataFrame(注意 - 由于限制行为,这实际上不是最佳性能)。
import org.apache.spark.sql.Row val topN = rdd.toDF.orderBy($"createDate").limit(n) topN.show // +----+----------+ // |name|createDate| // +----+----------+ // | a|2009-11-23| // +----+----------+ // Optionally recreate RDD[Foo] topN.map{case Row(name: String, date: Date) => Foo(name, date)}

