scala Spark:测试 RDD 是否为空的有效方法
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/28454357/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark: Efficient way to test if an RDD is empty
提问by Tobber
There is not an isEmptymethod on RDD's, so what is the most efficient way of testing if an RDD is empty?
isEmptyRDD 上没有方法,那么测试 RDD 是否为空的最有效方法是什么?
回答by Tobber
RDD.isEmpty()will be part of Spark 1.3.0.
RDD.isEmpty()将成为 Spark 1.3.0 的一部分。
Based on suggestions in this apache mail-threadand later some comments to this answer, I have done some small local experiments. The best method is using take(1).length==0.
根据这个 apache 邮件线程中的建议以及后来对此答案的一些评论,我做了一些小的本地实验。最好的方法是使用take(1).length==0.
def isEmpty[T](rdd : RDD[T]) = {
rdd.take(1).length == 0
}
It should run in O(1)except when the RDD is empty, in which case it is linear in the number of partitions.
O(1)除非 RDD 为空,否则它应该运行,在这种情况下,它与分区数量呈线性关系。
Thanks to Josh Rosen and Nick Chammas to point me to this.
感谢 Josh Rosen 和 Nick Chammas 向我指出这一点。
Note: This fails if the RDD is of type RDD[Nothing]e.g. isEmpty(sc.parallelize(Seq())), but this is likely not a problem in real life. isEmpty(sc.parallelize(Seq[Any]()))works fine.
注意:如果 RDD 是RDD[Nothing]eg类型isEmpty(sc.parallelize(Seq())),这会失败,但这在现实生活中可能不是问题。isEmpty(sc.parallelize(Seq[Any]()))工作正常。
Edits:
编辑:
- Edit 1:Added
take(1)==0method, thanks to comments.
- 编辑 1:添加
take(1)==0方法,感谢评论。
My original suggestion:Use mapPartitions.
我最初的建议:使用mapPartitions.
def isEmpty[T](rdd : RDD[T]) = {
rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)
}
It should scale in the number of partitions and is not nearly as clean as take(1). It is however robust to RDD's of type RDD[Nothing].
它应该在分区数量上进行扩展,并且不像take(1). 然而,它对 RDD 类型的RDD[Nothing].
Experiments:
实验:
I used this code for the timings.
我使用此代码进行计时。
def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {
val start = System.currentTimeMillis()
val rdd = sc.parallelize(1L to n, numSlices = 100)
val result = f(rdd)
printf("Time: " + (System.currentTimeMillis() - start) + " Result: " + result)
}
time(1000000000L, rdd => rdd.take(1).length == 0L)
time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1000000000L, rdd => rdd.count() == 0L)
time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)
time(1L, rdd => rdd.take(1).length == 0L)
time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1L, rdd => rdd.count() == 0L)
time(1L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1L, rdd => rdd.fold(0)(_ + _) == 0L)
time(0L, rdd => rdd.take(1).length == 0L)
time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(0L, rdd => rdd.count() == 0L)
time(0L, rdd => rdd.takeSample(true, 1).isEmpty)
time(0L, rdd => rdd.fold(0)(_ + _) == 0L)
On my local machine with 3 worker cores I got these results
在我有 3 个工作核心的本地机器上,我得到了这些结果
Time: 21 Result: false
Time: 75 Result: false
Time: 8664 Result: false
Time: 18266 Result: false
Time: 23836 Result: false
Time: 113 Result: false
Time: 101 Result: false
Time: 68 Result: false
Time: 221 Result: false
Time: 46 Result: false
Time: 79 Result: true
Time: 93 Result: true
Time: 79 Result: true
Time: 100 Result: true
Time: 64 Result: true
回答by marios
As of Spark 1.3the isEmpty()is part of the RDD api. A fix that was causing isEmptyto fail was later fixed in Spark 1.4.
从Spark 1.3 开始,它isEmpty()是 RDD api 的一部分。导致isEmpty失败的修复后来在Spark 1.4 中得到修复。
For DataFrames you can do:
对于数据帧,您可以执行以下操作:
val df: DataFrame = ...
df.rdd.isEmpty()
Here is paste of the code right off from the RDD implementation (as of 1.4.1).
这是从 RDD 实现中粘贴的代码(从 1.4.1 开始)。
/**
* @note due to complications in the internal implementation, this method will raise an
* exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
* because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
* (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
def isEmpty(): Boolean = withScope {
partitions.length == 0 || take(1).length == 0
}

