scala 如何在 Spark 中访问广播的 DataFrame
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/34931272/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to access broadcasted DataFrame in Spark
提问by Raghavendra Kulkarni
I have created two dataframes which are from Hive tables(PC_ITM and ITEM_SELL) and big in size and I am using those frequently in the SQL query by registering as table.But as those are big, it is taking much time to get the query result.So I have saved them as parquet file and then read them and registered as temporary table.But still I am not getting good performance so I have broadcasted those data-frames and then registered as tables as below.
我创建了两个数据框,它们来自 Hive 表(PC_ITM 和 ITEM_SELL)并且大小很大,我通过注册为表在 SQL 查询中经常使用这些数据框。但是由于这些数据框很大,获取查询结果需要很多时间.所以我将它们保存为镶木地板文件,然后读取它们并注册为临时表。但我仍然没有获得良好的性能,所以我广播了这些数据帧,然后注册为如下表。
PC_ITM_DF=sqlContext.parquetFile("path")
val PC_ITM_BC=sc.broadcast(PC_ITM_DF)
val PC_ITM_DF1=PC_ITM_BC
PC_ITM_DF1.registerAsTempTable("PC_ITM")
ITM_SELL_DF=sqlContext.parquetFile("path")
val ITM_SELL_BC=sc.broadcast(ITM_SELL_DF)
val ITM_SELL_DF1=ITM_SELL_BC.value
ITM_SELL_DF1.registerAsTempTable(ITM_SELL)
sqlContext.sql("JOIN Query").show
But still I cant achieve performance it is taking same time as when those data frames are not broadcasted.
但是我仍然无法实现性能,这与不广播这些数据帧的时间相同。
Can anyone tell if this is the right approach of broadcasting and using it?`
谁能判断这是否是广播和使用它的正确方法?`
回答by Kirk Broadhurst
You don't really need to 'access' the broadcast dataframe - you just use it, and Spark will implement the broadcast under the hood. The broadcast functionworks nicely, and makes more sense that the sc.broadcastapproach.
您实际上并不需要“访问”广播数据帧——您只需使用它,Spark 就会在幕后实现广播。该广播功能很好地工作,并且更有意义的sc.broadcast做法。
It can be hard to understand where the time is being spent if you evaluate everything at once.
如果您一次评估所有内容,可能很难理解时间都花在了什么地方。
You can break your code into steps. The key here will be performing an action and persisting the dataframes you want to broadcast beforeyou use them in your join.
您可以将代码分解为多个步骤。这里的关键是执行一个操作并在您在加入中使用它们之前保留您想要广播的数据帧。
// load your dataframe
PC_ITM_DF=sqlContext.parquetFile("path")
// mark this dataframe to be stored in memory once evaluated
PC_ITM_DF.persist()
// mark this dataframe to be broadcast
broadcast(PC_ITM_DF)
// perform an action to force the evaluation
PC_ITM_DF.count()
Doing this will ensure that the dataframe is
这样做将确保数据帧是
- loaded in memory (persist)
- registered as temp table for use in your SQL query
- marked as broadcast, so will be shipped to all executors
- 加载到内存中(持久化)
- 注册为临时表以在您的 SQL 查询中使用
- 标记为广播,因此将发送给所有执行者
When you now run sqlContext.sql("JOIN Query").showyou should now see a 'broadcast hash join' in the SQL tab of your Spark UI.
现在运行时,sqlContext.sql("JOIN Query").show您应该会在 Spark UI 的 SQL 选项卡中看到“广播哈希联接”。
回答by Alex Naspo
I would cache the rdds in memory. The next time they are needed, spark will read the RDD from memory rather than generating the RDD from scratch each time. Here is a link to the quick start docs.
我会将 rdd 缓存在内存中。下次需要它们时,spark 会从内存中读取 RDD,而不是每次都从头开始生成 RDD。这是快速入门文档的链接。
val PC_ITM_DF = sqlContext.parquetFile("path")
PC_ITM_DF.cache()
PC_ITM_DF.registerAsTempTable("PC_ITM")
val ITM_SELL_DF=sqlContext.parquetFile("path")
ITM_SELL_DF.cache()
ITM_SELL_DF.registerAsTempTable("ITM_SELL")
sqlContext.sql("JOIN Query").show
rdd.cache() is shorthand for rdd.persist(StorageLevel.MEMORY_ONLY). There are a few levels of persistence you can choose from incase your data is too big for memory only persistence. Here is a list of persistence options.If you want to manually remove the RDD from the cache you can call rdd.unpersist().
rdd.cache() 是 的简写rdd.persist(StorageLevel.MEMORY_ONLY)。有几个级别的持久性您可以选择,以防您的数据对于仅内存持久性而言太大。这是持久性选项的列表。如果您想从缓存中手动删除 RDD,您可以调用rdd.unpersist().
If you prefer to broadcast the data. You must first collect it on the driver before you broadcast it. This requires that your RDD fits in memory on your driver (and executers).
如果您更喜欢广播数据。在广播之前,您必须先在驱动程序上收集它。这要求您的 RDD 适合您的驱动程序(和执行程序)的内存。
回答by Raju Bairishetti
At this moment you can not access broadcasted data frame in the SQL query. You can use brocasted data frame through only through data frames.
此时您无法访问 SQL 查询中的广播数据帧。您只能通过数据帧使用广播数据帧。

