scala 对于 DStream 中的每个 RDD,我如何将其转换为数组或其他一些典型的 Java 数据类型?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/24772799/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
For each RDD in a DStream how do I convert this to an array or some other typical Java data type?
提问by CodingIsAwesome
I would like to convert a DStream into an array, list, etc. so I can then translate it to json and serve it on an endpoint. I'm using apache spark, injecting twitter data. How do I preform this operation on the Dstream statuses? I can't seem to get anything to work other than print().
我想将 DStream 转换为数组、列表等,以便我可以将其转换为 json 并在端点上提供服务。我正在使用 apache spark,注入 twitter 数据。如何在 Dstream 上执行此操作statuses?除了print().
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._
object Tutorial {
def main(args: Array[String]) {
// Location of the Spark directory
val sparkHome = "/opt/spark"
// URL of the Spark cluster
val sparkUrl = "local[8]"
// Location of the required JAR files
val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar"
// HDFS directory for checkpointing
val checkpointDir = "/tmp"
// Configure Twitter credentials using twitter.txt
TutorialHelper.configureTwitterCredentials()
val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile))
val filters = Array("#americasgottalent", "iamawesome")
val tweets = TwitterUtils.createStream(ssc, None, filters)
val statuses = tweets.map(status => status.getText())
val arry = Array("firstval")
statuses.foreachRDD {
arr :+ _.collect()
}
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
}
}
回答by aaronman
If your RDD is statusesyou can do.
如果你的 RDD 是statuses你可以做的。
val arr = new ArrayBuffer[String]();
statuses.foreachRDD {
arr ++= _.collect() //you can now put it in an array or d w/e you want with it
...
}
Keep in mind this could end up being way more data than you want in your driver since a DStream can be huge.
请记住,由于 DStream 可能很大,这最终可能会导致驱动程序中的数据比您想要的多。
回答by CodingIsAwesome
Turns our you were close, but what I ended up looking for is.
轮到我们你很接近,但我最终要寻找的是。
statuses.foreachRDD( rdd => {
for(item <- rdd.collect().toArray) {
println(item);
}
})

