scala 依靠 Spark Dataframe 的速度非常慢
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/45142105/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Count on Spark Dataframe is extremely slow
提问by br0ken.pipe
I'm creating a new DataFrame with a handful of records from a Join.
我正在创建一个新的 DataFrame,其中包含来自 Join 的少量记录。
val joined_df = first_df.join(second_df, first_df.col("key") ===
second_df.col("key") && second_df.col("key").isNull, "left_outer")
joined_df.repartition(1)
joined_df.cache()
joined_df.count()
Everything is fast (under one second) except the count operation. The RDD conversion kicks in and literally takes hours to complete. Is there any way to speed things up?
除了计数操作外,一切都很快(不到一秒)。RDD 转换开始了,实际上需要几个小时才能完成。有什么办法可以加快速度吗?
INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB)
INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52:36413 (size: 16.0 B, free: 829.8 MB)
INFO Executor: Finished task 140.0 in stage 10.0 (TID 544). 4232 bytes result sent to driver
INFO TaskSetManager: Starting task 142.0 in stage 10.0 (TID 545, localhost, executor driver, partition 142, PROCESS_LOCAL, 6284 bytes)
INFO Executor: Running task 142.0 in stage 10.0 (TID 545)
INFO TaskSetManager: Finished task 140.0 in stage 10.0 (TID 544) in 16 ms on localhost (executor driver) (136/200)
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
回答by Haroun Mohammedi
Everything is fast (under one second) except the count operation.
除了计数操作外,一切都很快(不到一秒)。
This is justified as follow : all operations before the countare called transformations and this type of spark operations are lazy i.e. it doesn't do any computation before calling an action (countin your example).
这是有道理的:之前的所有操作count都被称为转换,并且这种类型的火花操作是惰性的,即它在调用操作之前不进行任何计算(count在您的示例中)。
The second problem is in the repartition(1):
第二个问题是在repartition(1):
keep in mind that you'll lose all the parallelism offered by spark and you computation will be run in one executor (core if your are in standalone mode), so you must remove this step or change 1to a number propositional to the number of your CPU cores (standalone mode) or the number of executors (cluster mode).
请记住,您将失去 spark 提供的所有并行性,并且您的计算将在一个执行程序中运行(如果您处于独立模式,则为核心),因此您必须删除此步骤或将1更改为与数量相关的数字您的 CPU 内核(独立模式)或执行程序的数量(集群模式)。
The RDD conversion kicks in and literally takes hours to complete.
RDD 转换开始了,实际上需要几个小时才能完成。
If I understand correctly you would covert the DataFrameto an RDD, this is really a bad practice in spark and you should avoid such conversion as possible as you can.
this is because the data in DataFrameand Datasetare encoded using special spark encoders(it's called tungstant if I well remembered it) which take much less memory then the JVM serialization encoders, so such conversion mean that spark will change the type of your data from his own one (which take much less memoryand let spark optimizea lot of commutations by just work the encoded data and not serialize the data to work with and then deserialize it) to the JVM data type and this why DataFrames and Datasets are very powerful than RDDs
如果我理解正确,您会将 the 转换DataFrame为 an RDD,这在 spark 中确实是一个不好的做法,您应该尽可能避免这种转换。这是因为在数据DataFrame和Dataset使用的编码特殊的火花编码器,它采取更少的内存(这就是所谓的tungstant如果我也想起了它),然后JVM序列编码器,因此这种转换意味着火花将您的数据的类型改变从自己一个(它占用更少的内存,让 spark通过仅处理编码数据而不是序列化要使用的数据然后反序列化它来优化大量交换)到 JVM 数据类型,这就是为什么DataFrames 和Datasets 比RDDs非常强大的原因
Hope this help you
希望这对你有帮助
回答by timchap
As others have mentioned, the operations before countare "lazy" and only register a transformation, rather than actually force a computation.
正如其他人所提到的,之前的操作count是“懒惰的”,只注册一个转换,而不是实际强制计算。
When you call count, the computation is triggered. This is when Spark reads your data, performs all previously-registered transformations and calculates the result that you requested (in this case a count).
当您调用 时count,计算被触发。这是 Spark 读取您的数据、执行所有先前注册的转换并计算您请求的结果(在本例中为 a count)的时间。
The RDD conversion kicks in and literally takes hours to complete
RDD 转换开始,实际上需要几个小时才能完成
I think the term "conversion" is perhaps a bit inaccurate. What is actually happening is that the DataFrametransformations you registered are translated into RDDoperations, and these are applied to to the RDD that underlies your DataFrame. There is no conversion per sein the code you have given here.
我认为“转换”一词可能有点不准确。实际发生的事情是DataFrame您注册的转换被转换为RDD操作,并将这些应用到您的DataFrame. 您在此处提供的代码本身没有转换。
As an aside, it is possible to explicitly convert a DataFrameto an RDDvia the DataFrame.rddproperty. As mentioned in this answerthis is generally a bad idea, since you lose some of the benefits (in both performance and API) of having well-structured data.
顺便说一句DataFrame,可以RDD通过DataFrame.rdd属性将 a 显式转换为 an 。正如在这个答案中提到的,这通常是一个坏主意,因为您失去了拥有结构良好数据的一些好处(在性能和 API 方面)。

