使用 scala 将 Spark 处理的结果转储到 HDFS

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/24497389/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:22:41  来源:igfitidea点击:

Using scala to dump result processed by Spark to HDFS

scalahadoophdfsapache-spark

提问by user2773013

I'm a bit confused to find the right way to save data into HDFS after processing them with spark.

在使用 spark 处理数据后,我对找到将数据保存到 HDFS 的正确方法感到有些困惑。

This is what I'm trying to do. I'm calculating min, max and SD of numeric fields. My input files have millions of rows, but output will have only around 15-20 fields. So, the output is a single value(scalar) for each field.

这就是我正在尝试做的。我正在计算数字字段的最小值、最大值和标准差。我的输入文件有数百万行,但输出只有大约 15-20 个字段。因此,输出是每个字段的单个值(标量)。

For example: I will load all the rows of FIELD1 into an RDD, and at the end, I will get 3 single values for FIELD 1(MIN, MAX, SD). I concatenated these three values into temporary string. In the end, I will have 15 to twenty rows, containing 4 columns in this following format

例如:我将把 FIELD1 的所有行加载到一个 RDD 中,最后,我将得到 FIELD 1(MIN, MAX, SD) 的 3 个单个值。我将这三个值连接成临时字符串。最后,我将有 15 到 20 行,包含以下格式的 4 列

FIELD_NAME_1  MIN  MAX  SD
FIELD_NAME_2  MIN  MAX  SD

This is a snippet of the code:

这是代码片段:

//create rdd
val data = sc.textFile("hdfs://x.x.x.x/"+args(1)).cache()
//just get the first column
val values = data.map(_.split(",",-1)(1))

val data_double= values.map(x=>if(x==""){0}else{x}.toDouble)
val min_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(true).take(1)(0)._1
val max_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(false).take(1)(0)._1
val SD = data_double.stdev

So, i have 3 variables, min_value, max_value and SD that I want to store back to hdfs.

所以,我有 3 个变量,min_value、max_value 和 SD,我想将它们存储回 hdfs。

Question 1: Since the output will be rather small, do I just save it locally on the server? or should I dump it to HDFS. Seems to me like dumping the file locally makes better sense.

问题1:由于输出会比较小,我是否只将其保存在服务器本地?或者我应该将它转储到 HDFS。在我看来,在本地转储文件更有意义。

Question 2: In spark, I can just call the following to save an RDD into text file

问题 2:在 spark 中,我可以调用以下命令将 RDD 保存到文本文件中

some_RDD.saveAsTextFile("hdfs://namenode/path")

How do I accomplish the same thing in for a String variable that is not an RDD in scala? should I parallelize my result into an RDD first and then call saveAsTextFile?

对于不是 Scala 中的 RDD 的 String 变量,我如何完成相同的操作?我应该先将结果并行化到 RDD 中,然后再调用 saveAsTextFile 吗?

回答by aaronman

To save locally just do

要在本地保存,只需执行

some_RDD.collect()

some_RDD.collect()

Then save the resulting array with something like from this question. And yes if the data set is small, and can easily fit in memory you should collect and bring it to the driver of the program. Another option if the data is a little to large to store in memory is just some_RDD.coalesce(numParitionsToStoreOn). Keep in mind coalescealso takes a boolean shuffle, if you are doing calculations on the data before coalescing, you should set this to true to get more parallelism on the calculations. Coalesce will reduce the number of nodes that store data when you call some_RDD.saveAsTextFile("hdfs://namenode/path"). If the file is very small but you need it on hdfs, call repartition(1), which is the same as coalesce(1,true), this will ensure that your data is only saved on one node.

然后用类似这个问题的东西保存结果数组。是的,如果数据集很小,并且可以轻松放入内存中,您应该收集并将其提供给程序的驱动程序。如果数据有点大而无法存储在内存中,另一种选择是some_RDD.coalesce(numParitionsToStoreOn). 请记住,coalesce还需要一个 boolean shuffle,如果您在合并之前对数据进行计算,则应将其设置为 true 以获得更多的计算并行性。Coalesce 将在您调用some_RDD.saveAsTextFile("hdfs://namenode/path"). 如果文件很小但你需要在hdfs上,调用repartition(1),它与 相同coalesce(1,true),这将确保你的数据只保存在一个节点上。

UPDATE:So if all you want to do is save three values in HDFS you can do this. sc.parallelize(List((min_value,max_value,SD)),1).saveAsTextFile("pathTofile")

更新:因此,如果您只想在 HDFS 中保存三个值,您可以这样做。 sc.parallelize(List((min_value,max_value,SD)),1).saveAsTextFile("pathTofile")

Basically you are just putting the 3 vars in a tuple, wrap that in a List and set the parallelism to one since the data is very small

基本上,您只是将 3 个变量放在一个元组中,将其包装在一个 List 中并将并行度设置为 1,因为数据非常小

回答by Chong Tang

Answer 1: Since you just need several scalar, I'd like to say storing them in local file system. You can first do val localValue = rdd.collect(), which will collect all data from workers to master. And then you call java.io to write things to disk.

答案1:因为你只需要几个标量,我想说把它们存储在本地文件系统中。你可以先做val localValue = rdd.collect(),这将收集从工人到主人的所有数据。然后调用 java.io 将内容写入磁盘。

Answer 2: You can do sc.parallelize(yourString).saveAsTextFile("hdfs://host/yourFile"). The will write things to part-000*. If you want to have all things in one file, hdfs dfs -getmergeis here to help you.

答案 2:您可以执行 sc.parallelize(yourString).saveAsTextFile("hdfs://host/yourFile")。将把东西写到 part-000*。如果您想将所有内容都放在一个文件中,hdfs dfs -getmerge这里可以为您提供帮助。