scala 将 Array[(String,String)] 类型转换为 Spark 中的 RDD[(String,String)] 类型
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39615754/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Convert Array[(String,String)] type to RDD[(String,String)] type in spark
提问by Darshan
I am new to spark.
我是新来的火花。
Here is my code:
这是我的代码:
val Data = sc.parallelize(List(
("I", "India"),
("U", "USA"),
("W", "West")))
val DataArray = sc.broadcast(Data.collect)
val FinalData = DataArray.value
Here FinalDatais of Array[(String, String)]type.
But I want data to be in the form of RDD[(String, String)]type.
这里FinalData是Array[(String, String)]类型。但我希望数据采用RDD[(String, String)]类型的形式。
Can I convert FinalDatato RDD[(String, String)]type.
我可以转换FinalData为RDD[(String, String)]类型。
More Detail:
更多详情:
I want to join Two RDD So to optimize join condition(For performance point of view) I am broadcasting small RDD to all cluster so that data shuffling will be less.(Indirectly performance will get improved) So for all this I am writting something like this:
我想加入两个 RDD 所以为了优化加入条件(从性能角度来看)我正在向所有集群广播小 RDD,这样数据洗牌就会更少。(间接性能会得到改善)所以对于这一切,我正在写一些类似的东西这:
//Big Data
val FirstRDD = sc.parallelize(List(****Data of first table****))
//Small Data
val SecondRDD = sc.parallelize(List(****Data of Second table****))
So defintely I will broadcast Small Data set(means SecondRDD)
所以我肯定会广播小数据集(意味着SecondRDD)
val DataArray = sc.broadcast(SecondRDD.collect)
val FinalData = DataArray.value
//Here it will give error that
//这里会报错
val Join = FirstRDD.leftOuterJoin(FinalData)
Found Array required RDD
Found Array 需要 RDD
That's why I am looking for Array to RDD conversion.
这就是为什么我要寻找数组到 RDD 的转换。
回答by Alberto Bonsanto
The real is problem is that you are creating a Broadcastvariable, by collecting the RDD(notice that this action converts the RDDinto an Array). So, what I'm saying is that you already have an RDD, which is Data, and this variable has exactly the same values as FinalData, but in the form you want RDD[(String, String)].
真正的问题是您正在创建一个Broadcast变量,通过收集RDD(注意此操作将 转换RDD为Array)。所以,我要说的是,您已经有了RDD,也就是Data,并且该变量的值与 完全相同FinalData,但采用了您想要的形式RDD[(String, String)]。
You can check this in the following output.
您可以在以下输出中检查这一点。
Data: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[2] at parallelize at <console>:32
DataArray: org.apache.spark.broadcast.Broadcast[Array[(String, String)]] = Broadcast(1)
FinalData: Array[(String, String)] = Array((I,India), (U,USA), (W,West))
Although, I don't understand your approach You just need to parallelize the Broadcast's value.
虽然,我不明白你的方法你只需要并行化Broadcast的价值。
// You already have this data stored in `Data`, so it's useless repeat this process.
val DataCopy = sc.parallelize(DataArray.value)
EDIT
编辑
After reading your question again, I believe the problem is almost the same. You are trying to joinan RDDwith a Broadcastand that's not allowed. However, if you read the documentation you may notice that it's possible to joinboth RDDs (see code below).
再次阅读您的问题后,我相信问题几乎相同。您正在尝试join的RDD一个Broadcast,并且是不允许的。但是,如果您阅读文档,您可能会注意到可以同时加入两个RDDs(请参阅下面的代码)。
val joinRDD = FirstRDD.keyBy(_._1).join(SecondRDD.keyBy(_._1))
回答by Tzach Zohar
Broadcasts are indeed useful to improve performance of a JOIN between a large RDD and a smaller one. When you do that, broadcast (along with mapor mapPartitions) replacesthe join, it's not used ina join, and therefore in no way you'll need to "transform a broadcast into an RDD".
广播确实有助于提高大型 RDD 和较小 RDD 之间的 JOIN 性能。当您这样做时,广播(连同mapor mapPartitions)将替换连接,它不会在连接中使用,因此您无需“将广播转换为 RDD”。
Here's how it would look:
这是它的外观:
val largeRDD = sc.parallelize(List(
("I", "India"),
("U", "USA"),
("W", "West")))
val smallRDD = sc.parallelize(List(
("I", 34),
("U", 45)))
val smaller = sc.broadcast(smallRDD.collectAsMap())
// using "smaller.value" inside the function passed to RDD.map ->
// on executor side. Broadcast made sure it's copied to each executor (once!)
val joinResult = largeRDD.map { case (k, v) => (k, v, smaller.value.get(k)) }
joinResult.foreach(println)
// prints:
// (I,India,Some(34))
// (W,West,None)
// (U,USA,Some(45))
See a similar solution (using mapPartitions) which might be more efficient here.
在此处查看mapPartitions可能更有效的类似解决方案(使用)。

